AMQ-9243 - Remove deprecated jetty-continuation dependency (#998)

This commit removes the dependency on jetty-continuation and updates the
MessageServlet and AjaxListener servlets to use the Servlet Async api
directly for async requests through a new Async holder object that is
used to track/manage the request.

(cherry picked from commit 905f00c843)
This commit is contained in:
Christopher L. Shannon 2023-04-13 07:49:22 -04:00 committed by Christopher L. Shannon (cshannon)
parent f63328048f
commit add6356261
7 changed files with 261 additions and 90 deletions

View File

@ -211,10 +211,6 @@
<groupId>org.eclipse.jetty.websocket</groupId> <groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId> <artifactId>websocket-server</artifactId>
</exclusion> </exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -59,6 +59,40 @@ public class RestTest extends JettyTestSupport {
assertEquals("test", buf.toString()); assertEquals("test", buf.toString());
} }
@Test(timeout = 60 * 1000)
public void testConsumeAsync() throws Exception {
int port = getPort();
HttpClient httpClient = new HttpClient();
httpClient.start();
final StringBuffer buf = new StringBuffer();
final CountDownLatch latch =
asyncRequest(httpClient, "http://localhost:" + port + "/message/test?readTimeout=5000&type=queue", buf);
//Sleep 2 seconds before sending, should still get the response as timeout is 5 seconds
Thread.sleep(2000);
producer.send(session.createTextMessage("test"));
LOG.info("message sent");
latch.await();
assertEquals("test", buf.toString());
}
@Test(timeout = 60 * 1000)
public void testConsumeAsyncTimeout() throws Exception {
int port = getPort();
HttpClient httpClient = new HttpClient();
httpClient.start();
final StringBuffer buf = new StringBuffer();
final CountDownLatch latch =
asyncRequest(httpClient, "http://localhost:" + port + "/message/test?readTimeout=1000&type=queue", buf);
//Test timeout, no message was sent
latch.await();
assertTrue(buf.toString().contains("AsyncContext timeout"));
}
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
public void testSubscribeFirst() throws Exception { public void testSubscribeFirst() throws Exception {
int port = getPort(); int port = getPort();

View File

@ -94,11 +94,6 @@
<groupId>org.eclipse.jetty.websocket</groupId> <groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId> <artifactId>websocket-server</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
<version>${jetty-version}</version>
</dependency>
<!-- Rome RSS Reader --> <!-- Rome RSS Reader -->
<dependency> <dependency>

View File

@ -17,17 +17,15 @@
package org.apache.activemq.web; package org.apache.activemq.web;
import java.util.LinkedList; import java.util.LinkedList;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import org.apache.activemq.MessageAvailableListener; import org.apache.activemq.MessageAvailableListener;
import org.eclipse.jetty.continuation.Continuation; import org.apache.activemq.web.async.AsyncServletRequest;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/* /*
* Listen for available messages and wakeup any continuations. * Listen for available messages and wakeup any asyncRequests.
*/ */
public class AjaxListener implements MessageAvailableListener { public class AjaxListener implements MessageAvailableListener {
private static final Logger LOG = LoggerFactory.getLogger(AjaxListener.class); private static final Logger LOG = LoggerFactory.getLogger(AjaxListener.class);
@ -35,7 +33,7 @@ public class AjaxListener implements MessageAvailableListener {
private final long maximumReadTimeout; private final long maximumReadTimeout;
private final AjaxWebClient client; private final AjaxWebClient client;
private long lastAccess; private long lastAccess;
private Continuation continuation; private AsyncServletRequest asyncRequest;
private final LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>(); private final LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
AjaxListener(AjaxWebClient client, long maximumReadTimeout) { AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
@ -48,8 +46,8 @@ public class AjaxListener implements MessageAvailableListener {
lastAccess = System.currentTimeMillis(); lastAccess = System.currentTimeMillis();
} }
public synchronized void setContinuation(Continuation continuation) { public synchronized void setAsyncRequest(AsyncServletRequest asyncRequest) {
this.continuation = continuation; this.asyncRequest = asyncRequest;
} }
public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() { public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() {
@ -58,19 +56,19 @@ public class AjaxListener implements MessageAvailableListener {
@Override @Override
public synchronized void onMessageAvailable(MessageConsumer consumer) { public synchronized void onMessageAvailable(MessageConsumer consumer) {
LOG.debug("Message for consumer: {} continuation: {}", consumer, continuation); LOG.debug("Message for consumer: {} asyncRequest: {}", consumer, asyncRequest);
if (continuation != null) { if (asyncRequest != null) {
try { try {
Message message = consumer.receive(10); Message message = consumer.receive(10);
LOG.debug("message is " + message); LOG.debug("message is " + message);
if (message != null) { if (message != null) {
if (!continuation.isResumed()) { if (!asyncRequest.isDispatched()) {
LOG.debug("Resuming suspended continuation {}", continuation); LOG.debug("Resuming suspended asyncRequest {}", asyncRequest);
continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage(message, consumer)); asyncRequest.setAttribute("undelivered_message", new UndeliveredAjaxMessage(message, consumer));
continuation.resume(); asyncRequest.dispatch();
} else { } else {
LOG.debug("Message available, but continuation is already resumed. Buffer for next time."); LOG.debug("Message available, but asyncRequest is already resumed. Buffer for next time.");
bufferMessageForDelivery(message, consumer); bufferMessageForDelivery(message, consumer);
} }
} }

View File

@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -38,11 +37,8 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession; import javax.servlet.http.HttpSession;
import org.apache.activemq.MessageAvailableConsumer; import org.apache.activemq.MessageAvailableConsumer;
import org.eclipse.jetty.continuation.Continuation; import org.apache.activemq.web.async.AsyncServletRequest;
import org.eclipse.jetty.continuation.ContinuationListener;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -272,7 +268,7 @@ public class MessageListenerServlet extends MessageServletSupport {
LOG.debug("doMessage timeout=" + timeout); LOG.debug("doMessage timeout=" + timeout);
} }
// this is non-null if we're resuming the continuation. // this is non-null if we're resuming the asyncRequest.
// attributes set in AjaxListener // attributes set in AjaxListener
UndeliveredAjaxMessage undelivered_message = null; UndeliveredAjaxMessage undelivered_message = null;
Message message = null; Message message = null;
@ -310,28 +306,9 @@ public class MessageListenerServlet extends MessageServletSupport {
response.setHeader("Cache-Control", "no-cache"); response.setHeader("Cache-Control", "no-cache");
if (message == null && client.getListener().getUndeliveredMessages().size() == 0) { if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
Continuation continuation = ContinuationSupport.getContinuation(request); final AsyncServletRequest asyncRequest = AsyncServletRequest.getAsyncRequest(request);
// Add a listener to the continuation to make sure it actually if (asyncRequest.isExpired()) {
// will expire (seems like a bug in Jetty Servlet 3 continuations,
// see https://issues.apache.org/jira/browse/AMQ-3447
continuation.addContinuationListener(new ContinuationListener() {
@Override
public void onTimeout(Continuation cont) {
if (LOG.isDebugEnabled()) {
LOG.debug("Continuation " + cont.toString() + " expired.");
}
}
@Override
public void onComplete(Continuation cont) {
if (LOG.isDebugEnabled()) {
LOG.debug("Continuation " + cont.toString() + " completed.");
}
}
});
if (continuation.isExpired()) {
response.setStatus(HttpServletResponse.SC_OK); response.setStatus(HttpServletResponse.SC_OK);
StringWriter swriter = new StringWriter(); StringWriter swriter = new StringWriter();
PrintWriter writer = new PrintWriter(swriter); PrintWriter writer = new PrintWriter(swriter);
@ -345,16 +322,16 @@ public class MessageListenerServlet extends MessageServletSupport {
return; return;
} }
continuation.setTimeout(timeout); asyncRequest.setTimeoutMs(timeout);
continuation.suspend(); asyncRequest.startAsync();
LOG.debug( "Suspending continuation " + continuation ); LOG.debug("Suspending asyncRequest " + asyncRequest);
// Fetch the listeners // Fetch the listeners
AjaxListener listener = client.getListener(); AjaxListener listener = client.getListener();
listener.access(); listener.access();
// register this continuation with our listener. // register this asyncRequest with our listener.
listener.setContinuation(continuation); listener.setAsyncRequest(asyncRequest);
return; return;
} }
@ -377,7 +354,7 @@ public class MessageListenerServlet extends MessageServletSupport {
messages++; messages++;
} }
// send messages buffered while continuation was unavailable. // send messages buffered while asyncRequest was unavailable.
LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages(); LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages"); LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
synchronized( undeliveredMessages ) { synchronized( undeliveredMessages ) {

View File

@ -17,24 +17,27 @@
package org.apache.activemq.web; package org.apache.activemq.web;
import org.apache.activemq.MessageAvailableConsumer;
import org.apache.activemq.MessageAvailableListener;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.MessageAvailableConsumer;
import org.apache.activemq.MessageAvailableListener;
import org.apache.activemq.web.async.AsyncServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A servlet for sending and receiving messages to/from JMS destinations using * A servlet for sending and receiving messages to/from JMS destinations using
@ -53,7 +56,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class MessageServlet extends MessageServletSupport { public class MessageServlet extends MessageServletSupport {
// its a bit pita that this servlet got intermixed with jetty continuation/rest // its a bit pita that this servlet got intermixed with asyncRequest/rest
// instead of creating a special for that. We should have kept a simple servlet // instead of creating a special for that. We should have kept a simple servlet
// for good old fashioned request/response blocked communication. // for good old fashioned request/response blocked communication.
@ -185,11 +188,11 @@ public class MessageServlet extends MessageServletSupport {
throw new NoDestinationSuppliedException(); throw new NoDestinationSuppliedException();
} }
consumer = (MessageAvailableConsumer) client.getConsumer(destination, request.getHeader(WebClient.selectorName)); consumer = (MessageAvailableConsumer) client.getConsumer(destination, request.getHeader(WebClient.selectorName));
Continuation continuation = ContinuationSupport.getContinuation(request); final AsyncServletRequest asyncRequest = AsyncServletRequest.getAsyncRequest(request);
// Don't allow concurrent use of the consumer. Do make sure to allow // Don't allow concurrent use of the consumer. Do make sure to allow
// subsequent calls on continuation to use the consumer. // subsequent calls on asyncRequest to use the consumer.
if (continuation.isInitial() && !activeConsumers.add(consumer)) { if (asyncRequest.isInitial() && !activeConsumers.add(consumer)) {
throw new ServletException("Concurrent access to consumer is not supported"); throw new ServletException("Concurrent access to consumer is not supported");
} }
@ -224,7 +227,7 @@ public class MessageServlet extends MessageServletSupport {
} }
if (message == null) { if (message == null) {
handleContinuation(request, response, client, destination, consumer, deadline); handleAsyncRequest(request, response, client, destination, consumer, deadline);
} else { } else {
writeResponse(request, response, message); writeResponse(request, response, message);
closeConsumerOnOneShot(request, client, destination); closeConsumerOnOneShot(request, client, destination);
@ -236,19 +239,19 @@ public class MessageServlet extends MessageServletSupport {
} }
} }
protected void handleContinuation(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination, protected void handleAsyncRequest(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination,
MessageAvailableConsumer consumer, long deadline) { MessageAvailableConsumer consumer, long deadline) {
// Get an existing Continuation or create a new one if there are no events. // Get an existing ActiveMQAsyncRequest or create a new one if there are no events.
Continuation continuation = ContinuationSupport.getContinuation(request); final AsyncServletRequest asyncRequest = AsyncServletRequest.getAsyncRequest(request);
long timeout = deadline - System.currentTimeMillis(); long timeout = deadline - System.currentTimeMillis();
if ((continuation.isExpired()) || (timeout <= 0)) { if ((asyncRequest.isExpired()) || (timeout <= 0)) {
// Reset the continuation on the available listener for the consumer to prevent the // Reset the asyncRequest on the available listener for the consumer to prevent the
// next message receipt from being consumed without a valid, active continuation. // next message receipt from being consumed without a valid, active asyncRequest.
synchronized (consumer) { synchronized (consumer) {
Object obj = consumer.getAvailableListener(); Object obj = consumer.getAvailableListener();
if (obj instanceof Listener) { if (obj instanceof Listener) {
((Listener) obj).setContinuation(null); ((Listener) obj).setAsyncRequest(null);
} }
} }
response.setStatus(HttpServletResponse.SC_NO_CONTENT); response.setStatus(HttpServletResponse.SC_NO_CONTENT);
@ -257,14 +260,14 @@ public class MessageServlet extends MessageServletSupport {
return; return;
} }
continuation.setTimeout(timeout); asyncRequest.setTimeoutMs(timeout);
continuation.suspend(); asyncRequest.startAsync();
synchronized (consumer) { synchronized (consumer) {
Listener listener = (Listener) consumer.getAvailableListener(); Listener listener = (Listener) consumer.getAvailableListener();
// register this continuation with our listener. // register this asyncRequest with our listener.
listener.setContinuation(continuation); listener.setAsyncRequest(asyncRequest);
} }
} }
@ -421,19 +424,19 @@ public class MessageServlet extends MessageServletSupport {
} }
/* /*
* Listen for available messages and wakeup any continuations. * Listen for available messages and wakeup any asyncRequests.
*/ */
private static class Listener implements MessageAvailableListener { private static class Listener implements MessageAvailableListener {
MessageConsumer consumer; MessageConsumer consumer;
Continuation continuation; AsyncServletRequest asyncRequest;
Listener(MessageConsumer consumer) { Listener(MessageConsumer consumer) {
this.consumer = consumer; this.consumer = consumer;
} }
public void setContinuation(Continuation continuation) { public void setAsyncRequest(AsyncServletRequest asyncRequest) {
synchronized (consumer) { synchronized (consumer) {
this.continuation = continuation; this.asyncRequest = asyncRequest;
} }
} }
@ -444,8 +447,8 @@ public class MessageServlet extends MessageServletSupport {
((MessageAvailableConsumer) consumer).setAvailableListener(null); ((MessageAvailableConsumer) consumer).setAvailableListener(null);
synchronized (this.consumer) { synchronized (this.consumer) {
if (continuation != null) { if (asyncRequest != null) {
continuation.resume(); asyncRequest.dispatch();
} }
} }
} }

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.web.async;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.DispatcherType;
import javax.servlet.ServletRequest;
import javax.servlet.ServletRequestWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wrapper object to hold and track Async servlet requests. This is
* a replacement for the deprecated/removed Jetty Continuation
* API as that has long been replaced by the Servlet Async api.
*
*/
public class AsyncServletRequest implements AsyncListener {
private static final Logger LOG = LoggerFactory.getLogger(AsyncServletRequest.class);
private static final String ACTIVEMQ_ASYNC_SERVLET_REQUEST = "activemq.async.servlet.request";
private final ServletRequest request;
private final AtomicReference<AsyncContext> contextRef = new AtomicReference<>();
private final AtomicBoolean dispatched = new AtomicBoolean();
private final AtomicBoolean expired = new AtomicBoolean();
private final AtomicLong timeoutMs = new AtomicLong(-1);
public AsyncServletRequest(ServletRequest request) {
this.request = request;
}
public void complete() {
final AsyncContext context = getContext();
context.complete();
}
public void startAsync() {
//Reset previous state
this.dispatched.set(false);
this.expired.set(false);
final AsyncContext context = request.startAsync();
contextRef.set(context);
context.setTimeout(timeoutMs.get());
context.addListener(this);
}
public void dispatch() {
final AsyncContext context = getContext();
this.dispatched.set(true);
context.dispatch();
}
public void setAttribute(String name, Object attribute) {
request.setAttribute(name, attribute);
}
public void setTimeoutMs(long timeoutMs) {
this.timeoutMs.set(timeoutMs);
}
public boolean isInitial() {
return this.request.getDispatcherType() != DispatcherType.ASYNC;
}
public boolean isExpired() {
return this.expired.get();
}
public boolean isDispatched() {
return dispatched.get();
}
public AsyncContext getAsyncContext() {
return contextRef.get();
}
@Override
public void onComplete(AsyncEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("ActiveMQAsyncRequest " + event + " completed.");
}
}
@Override
public void onTimeout(AsyncEvent event) {
this.expired.set(true);
if (LOG.isDebugEnabled()) {
LOG.debug("ActiveMQAsyncRequest " + event + " timeout.");
}
}
@Override
public void onError(AsyncEvent event) {
final Throwable error = event.getThrowable();
if (error != null) {
LOG.warn("ActiveMQAsyncRequest " + event + " error: {}", error.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug(error.getMessage(), error);
}
}
}
@Override
public void onStartAsync(AsyncEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("ActiveMQAsyncRequest " + event + " async started.");
}
}
private AsyncContext getContext() {
final AsyncContext context = this.contextRef.get();
if (context == null) {
throw new IllegalStateException("Async request has not been started.");
}
return context;
}
/**
* Look up the existing async request or create/ store a new request that be referenced later
* @param request the ServletRequest
* @return the existing or new ActiveMQAsyncRequest
*/
public static AsyncServletRequest getAsyncRequest(final ServletRequest request) {
Objects.requireNonNull(request, "ServletRequest must not be null");
return Optional.ofNullable(request.getAttribute(ACTIVEMQ_ASYNC_SERVLET_REQUEST))
.map(sr -> (AsyncServletRequest)sr).orElseGet(() -> {
final AsyncServletRequest asyncRequest = new AsyncServletRequest(unwrap(request));
request.setAttribute(ACTIVEMQ_ASYNC_SERVLET_REQUEST, asyncRequest);
return asyncRequest;
});
}
private static ServletRequest unwrap(ServletRequest request) {
Objects.requireNonNull(request, "ServletRequest must not be null");
//If it's a wrapper object then unwrap to get the source request
while (request instanceof ServletRequestWrapper) {
request = ((ServletRequestWrapper)request).getRequest();
}
return request;
}
}