diff --git a/activemq-web-console/pom.xml b/activemq-web-console/pom.xml index 0710f7deca..4bf0c98545 100644 --- a/activemq-web-console/pom.xml +++ b/activemq-web-console/pom.xml @@ -211,10 +211,6 @@ org.eclipse.jetty.websocket websocket-server - - org.eclipse.jetty - jetty-continuation - diff --git a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java index 872f501b78..9319601e90 100644 --- a/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java +++ b/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java @@ -59,6 +59,40 @@ public class RestTest extends JettyTestSupport { assertEquals("test", buf.toString()); } + @Test(timeout = 60 * 1000) + public void testConsumeAsync() throws Exception { + int port = getPort(); + HttpClient httpClient = new HttpClient(); + httpClient.start(); + + final StringBuffer buf = new StringBuffer(); + final CountDownLatch latch = + asyncRequest(httpClient, "http://localhost:" + port + "/message/test?readTimeout=5000&type=queue", buf); + + //Sleep 2 seconds before sending, should still get the response as timeout is 5 seconds + Thread.sleep(2000); + producer.send(session.createTextMessage("test")); + LOG.info("message sent"); + + latch.await(); + assertEquals("test", buf.toString()); + } + + @Test(timeout = 60 * 1000) + public void testConsumeAsyncTimeout() throws Exception { + int port = getPort(); + HttpClient httpClient = new HttpClient(); + httpClient.start(); + + final StringBuffer buf = new StringBuffer(); + final CountDownLatch latch = + asyncRequest(httpClient, "http://localhost:" + port + "/message/test?readTimeout=1000&type=queue", buf); + + //Test timeout, no message was sent + latch.await(); + assertTrue(buf.toString().contains("AsyncContext timeout")); + } + @Test(timeout = 60 * 1000) public void testSubscribeFirst() throws Exception { int port = getPort(); diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml index 0aba34c174..f05498f9ab 100644 --- a/activemq-web/pom.xml +++ b/activemq-web/pom.xml @@ -94,11 +94,6 @@ org.eclipse.jetty.websocket websocket-server - - org.eclipse.jetty - jetty-continuation - ${jetty-version} - diff --git a/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java index 6d355c2d5a..3a149f3bfd 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java @@ -17,17 +17,15 @@ package org.apache.activemq.web; import java.util.LinkedList; - import javax.jms.Message; import javax.jms.MessageConsumer; - import org.apache.activemq.MessageAvailableListener; -import org.eclipse.jetty.continuation.Continuation; +import org.apache.activemq.web.async.AsyncServletRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /* - * Listen for available messages and wakeup any continuations. + * Listen for available messages and wakeup any asyncRequests. */ public class AjaxListener implements MessageAvailableListener { private static final Logger LOG = LoggerFactory.getLogger(AjaxListener.class); @@ -35,7 +33,7 @@ public class AjaxListener implements MessageAvailableListener { private final long maximumReadTimeout; private final AjaxWebClient client; private long lastAccess; - private Continuation continuation; + private AsyncServletRequest asyncRequest; private final LinkedList undeliveredMessages = new LinkedList(); AjaxListener(AjaxWebClient client, long maximumReadTimeout) { @@ -48,8 +46,8 @@ public class AjaxListener implements MessageAvailableListener { lastAccess = System.currentTimeMillis(); } - public synchronized void setContinuation(Continuation continuation) { - this.continuation = continuation; + public synchronized void setAsyncRequest(AsyncServletRequest asyncRequest) { + this.asyncRequest = asyncRequest; } public LinkedList getUndeliveredMessages() { @@ -58,19 +56,19 @@ public class AjaxListener implements MessageAvailableListener { @Override public synchronized void onMessageAvailable(MessageConsumer consumer) { - LOG.debug("Message for consumer: {} continuation: {}", consumer, continuation); + LOG.debug("Message for consumer: {} asyncRequest: {}", consumer, asyncRequest); - if (continuation != null) { + if (asyncRequest != null) { try { Message message = consumer.receive(10); LOG.debug("message is " + message); if (message != null) { - if (!continuation.isResumed()) { - LOG.debug("Resuming suspended continuation {}", continuation); - continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage(message, consumer)); - continuation.resume(); + if (!asyncRequest.isDispatched()) { + LOG.debug("Resuming suspended asyncRequest {}", asyncRequest); + asyncRequest.setAttribute("undelivered_message", new UndeliveredAjaxMessage(message, consumer)); + asyncRequest.dispatch(); } else { - LOG.debug("Message available, but continuation is already resumed. Buffer for next time."); + LOG.debug("Message available, but asyncRequest is already resumed. Buffer for next time."); bufferMessageForDelivery(message, consumer); } } diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java index cb4cc3c822..c3ecd7f7ac 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; - import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -38,11 +37,8 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; - import org.apache.activemq.MessageAvailableConsumer; -import org.eclipse.jetty.continuation.Continuation; -import org.eclipse.jetty.continuation.ContinuationListener; -import org.eclipse.jetty.continuation.ContinuationSupport; +import org.apache.activemq.web.async.AsyncServletRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -272,7 +268,7 @@ public class MessageListenerServlet extends MessageServletSupport { LOG.debug("doMessage timeout=" + timeout); } - // this is non-null if we're resuming the continuation. + // this is non-null if we're resuming the asyncRequest. // attributes set in AjaxListener UndeliveredAjaxMessage undelivered_message = null; Message message = null; @@ -310,28 +306,9 @@ public class MessageListenerServlet extends MessageServletSupport { response.setHeader("Cache-Control", "no-cache"); if (message == null && client.getListener().getUndeliveredMessages().size() == 0) { - Continuation continuation = ContinuationSupport.getContinuation(request); + final AsyncServletRequest asyncRequest = AsyncServletRequest.getAsyncRequest(request); - // Add a listener to the continuation to make sure it actually - // will expire (seems like a bug in Jetty Servlet 3 continuations, - // see https://issues.apache.org/jira/browse/AMQ-3447 - continuation.addContinuationListener(new ContinuationListener() { - @Override - public void onTimeout(Continuation cont) { - if (LOG.isDebugEnabled()) { - LOG.debug("Continuation " + cont.toString() + " expired."); - } - } - - @Override - public void onComplete(Continuation cont) { - if (LOG.isDebugEnabled()) { - LOG.debug("Continuation " + cont.toString() + " completed."); - } - } - }); - - if (continuation.isExpired()) { + if (asyncRequest.isExpired()) { response.setStatus(HttpServletResponse.SC_OK); StringWriter swriter = new StringWriter(); PrintWriter writer = new PrintWriter(swriter); @@ -345,16 +322,16 @@ public class MessageListenerServlet extends MessageServletSupport { return; } - continuation.setTimeout(timeout); - continuation.suspend(); - LOG.debug( "Suspending continuation " + continuation ); + asyncRequest.setTimeoutMs(timeout); + asyncRequest.startAsync(); + LOG.debug("Suspending asyncRequest " + asyncRequest); // Fetch the listeners AjaxListener listener = client.getListener(); listener.access(); - // register this continuation with our listener. - listener.setContinuation(continuation); + // register this asyncRequest with our listener. + listener.setAsyncRequest(asyncRequest); return; } @@ -377,7 +354,7 @@ public class MessageListenerServlet extends MessageServletSupport { messages++; } - // send messages buffered while continuation was unavailable. + // send messages buffered while asyncRequest was unavailable. LinkedList undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages(); LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages"); synchronized( undeliveredMessages ) { diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java index 2afd2ab54e..b40b50f22f 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java @@ -17,24 +17,27 @@ package org.apache.activemq.web; -import org.apache.activemq.MessageAvailableConsumer; -import org.apache.activemq.MessageAvailableListener; -import org.eclipse.jetty.continuation.Continuation; -import org.eclipse.jetty.continuation.ContinuationSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.*; -import javax.servlet.ServletConfig; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; import java.util.Enumeration; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.activemq.MessageAvailableConsumer; +import org.apache.activemq.MessageAvailableListener; +import org.apache.activemq.web.async.AsyncServletRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A servlet for sending and receiving messages to/from JMS destinations using @@ -53,7 +56,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class MessageServlet extends MessageServletSupport { - // its a bit pita that this servlet got intermixed with jetty continuation/rest + // its a bit pita that this servlet got intermixed with asyncRequest/rest // instead of creating a special for that. We should have kept a simple servlet // for good old fashioned request/response blocked communication. @@ -185,11 +188,11 @@ public class MessageServlet extends MessageServletSupport { throw new NoDestinationSuppliedException(); } consumer = (MessageAvailableConsumer) client.getConsumer(destination, request.getHeader(WebClient.selectorName)); - Continuation continuation = ContinuationSupport.getContinuation(request); + final AsyncServletRequest asyncRequest = AsyncServletRequest.getAsyncRequest(request); // Don't allow concurrent use of the consumer. Do make sure to allow - // subsequent calls on continuation to use the consumer. - if (continuation.isInitial() && !activeConsumers.add(consumer)) { + // subsequent calls on asyncRequest to use the consumer. + if (asyncRequest.isInitial() && !activeConsumers.add(consumer)) { throw new ServletException("Concurrent access to consumer is not supported"); } @@ -224,7 +227,7 @@ public class MessageServlet extends MessageServletSupport { } if (message == null) { - handleContinuation(request, response, client, destination, consumer, deadline); + handleAsyncRequest(request, response, client, destination, consumer, deadline); } else { writeResponse(request, response, message); closeConsumerOnOneShot(request, client, destination); @@ -236,19 +239,19 @@ public class MessageServlet extends MessageServletSupport { } } - protected void handleContinuation(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination, + protected void handleAsyncRequest(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination, MessageAvailableConsumer consumer, long deadline) { - // Get an existing Continuation or create a new one if there are no events. - Continuation continuation = ContinuationSupport.getContinuation(request); + // Get an existing ActiveMQAsyncRequest or create a new one if there are no events. + final AsyncServletRequest asyncRequest = AsyncServletRequest.getAsyncRequest(request); long timeout = deadline - System.currentTimeMillis(); - if ((continuation.isExpired()) || (timeout <= 0)) { - // Reset the continuation on the available listener for the consumer to prevent the - // next message receipt from being consumed without a valid, active continuation. + if ((asyncRequest.isExpired()) || (timeout <= 0)) { + // Reset the asyncRequest on the available listener for the consumer to prevent the + // next message receipt from being consumed without a valid, active asyncRequest. synchronized (consumer) { Object obj = consumer.getAvailableListener(); if (obj instanceof Listener) { - ((Listener) obj).setContinuation(null); + ((Listener) obj).setAsyncRequest(null); } } response.setStatus(HttpServletResponse.SC_NO_CONTENT); @@ -257,14 +260,14 @@ public class MessageServlet extends MessageServletSupport { return; } - continuation.setTimeout(timeout); - continuation.suspend(); + asyncRequest.setTimeoutMs(timeout); + asyncRequest.startAsync(); synchronized (consumer) { Listener listener = (Listener) consumer.getAvailableListener(); - // register this continuation with our listener. - listener.setContinuation(continuation); + // register this asyncRequest with our listener. + listener.setAsyncRequest(asyncRequest); } } @@ -421,19 +424,19 @@ public class MessageServlet extends MessageServletSupport { } /* - * Listen for available messages and wakeup any continuations. + * Listen for available messages and wakeup any asyncRequests. */ private static class Listener implements MessageAvailableListener { MessageConsumer consumer; - Continuation continuation; + AsyncServletRequest asyncRequest; Listener(MessageConsumer consumer) { this.consumer = consumer; } - public void setContinuation(Continuation continuation) { + public void setAsyncRequest(AsyncServletRequest asyncRequest) { synchronized (consumer) { - this.continuation = continuation; + this.asyncRequest = asyncRequest; } } @@ -444,8 +447,8 @@ public class MessageServlet extends MessageServletSupport { ((MessageAvailableConsumer) consumer).setAvailableListener(null); synchronized (this.consumer) { - if (continuation != null) { - continuation.resume(); + if (asyncRequest != null) { + asyncRequest.dispatch(); } } } diff --git a/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java b/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java new file mode 100644 index 0000000000..79aa3fb06f --- /dev/null +++ b/activemq-web/src/main/java/org/apache/activemq/web/async/AsyncServletRequest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.web.async; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; +import javax.servlet.DispatcherType; +import javax.servlet.ServletRequest; +import javax.servlet.ServletRequestWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper object to hold and track Async servlet requests. This is + * a replacement for the deprecated/removed Jetty Continuation + * API as that has long been replaced by the Servlet Async api. + * + */ +public class AsyncServletRequest implements AsyncListener { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncServletRequest.class); + + private static final String ACTIVEMQ_ASYNC_SERVLET_REQUEST = "activemq.async.servlet.request"; + + private final ServletRequest request; + private final AtomicReference contextRef = new AtomicReference<>(); + private final AtomicBoolean dispatched = new AtomicBoolean(); + private final AtomicBoolean expired = new AtomicBoolean(); + private final AtomicLong timeoutMs = new AtomicLong(-1); + + public AsyncServletRequest(ServletRequest request) { + this.request = request; + } + + public void complete() { + final AsyncContext context = getContext(); + context.complete(); + } + + public void startAsync() { + //Reset previous state + this.dispatched.set(false); + this.expired.set(false); + + final AsyncContext context = request.startAsync(); + contextRef.set(context); + context.setTimeout(timeoutMs.get()); + context.addListener(this); + } + + public void dispatch() { + final AsyncContext context = getContext(); + this.dispatched.set(true); + context.dispatch(); + } + + public void setAttribute(String name, Object attribute) { + request.setAttribute(name, attribute); + } + + public void setTimeoutMs(long timeoutMs) { + this.timeoutMs.set(timeoutMs); + } + + public boolean isInitial() { + return this.request.getDispatcherType() != DispatcherType.ASYNC; + } + + public boolean isExpired() { + return this.expired.get(); + } + + public boolean isDispatched() { + return dispatched.get(); + } + + public AsyncContext getAsyncContext() { + return contextRef.get(); + } + + @Override + public void onComplete(AsyncEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("ActiveMQAsyncRequest " + event + " completed."); + } + } + + @Override + public void onTimeout(AsyncEvent event) { + this.expired.set(true); + if (LOG.isDebugEnabled()) { + LOG.debug("ActiveMQAsyncRequest " + event + " timeout."); + } + } + + @Override + public void onError(AsyncEvent event) { + final Throwable error = event.getThrowable(); + if (error != null) { + LOG.warn("ActiveMQAsyncRequest " + event + " error: {}", error.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug(error.getMessage(), error); + } + } + } + + @Override + public void onStartAsync(AsyncEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("ActiveMQAsyncRequest " + event + " async started."); + } + } + + private AsyncContext getContext() { + final AsyncContext context = this.contextRef.get(); + if (context == null) { + throw new IllegalStateException("Async request has not been started."); + } + return context; + } + + /** + * Look up the existing async request or create/ store a new request that be referenced later + * @param request the ServletRequest + * @return the existing or new ActiveMQAsyncRequest + */ + public static AsyncServletRequest getAsyncRequest(final ServletRequest request) { + Objects.requireNonNull(request, "ServletRequest must not be null"); + + return Optional.ofNullable(request.getAttribute(ACTIVEMQ_ASYNC_SERVLET_REQUEST)) + .map(sr -> (AsyncServletRequest)sr).orElseGet(() -> { + final AsyncServletRequest asyncRequest = new AsyncServletRequest(unwrap(request)); + request.setAttribute(ACTIVEMQ_ASYNC_SERVLET_REQUEST, asyncRequest); + return asyncRequest; + }); + } + + private static ServletRequest unwrap(ServletRequest request) { + Objects.requireNonNull(request, "ServletRequest must not be null"); + + //If it's a wrapper object then unwrap to get the source request + while (request instanceof ServletRequestWrapper) { + request = ((ServletRequestWrapper)request).getRequest(); + } + + return request; + } +}