Fix timeout errors on Jetty shutdown

With the new Jetty updates the shutdown is being blocked by any calls to
open BlockingQueueTransports that are waiting on queue.poll(). This
commit sends a shutdown info packet to unblock the poll() and also
increases the timeout to something more reasonable such as 30 seconds.
This commit is contained in:
Christopher L. Shannon (cshannon) 2023-06-19 18:07:19 -04:00 committed by Matt Pavlovich
parent f1c7b7350f
commit ef0b6dd0f2
4 changed files with 22 additions and 6 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.apache.activemq.broker.SslContext; import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.http.BlockingQueueTransport;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
@ -144,8 +145,7 @@ public class SecureSocketConnectorFactory extends SocketConnectorFactory {
connector = new ServerConnector(server, factory, httpConnectionFactory); connector = new ServerConnector(server, factory, httpConnectionFactory);
} }
server.setStopTimeout(60_000l); server.setStopTimeout(30_000L);
//connector.setStopTimeout(500);
return connector; return connector;
} }
} }

View File

@ -29,8 +29,7 @@ public class SocketConnectorFactory {
public Connector createConnector(Server server) throws Exception { public Connector createConnector(Server server) throws Exception {
ServerConnector connector = new ServerConnector(server); ServerConnector connector = new ServerConnector(server);
server.setStopTimeout(60_000l); server.setStopTimeout(30_000L);
//connector.setStopTimeout(500);
if (transportOptions != null) { if (transportOptions != null) {
IntrospectionSupport.setProperties(connector, transportOptions, ""); IntrospectionSupport.setProperties(connector, transportOptions, "");
} }

View File

@ -21,6 +21,7 @@ import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.Map;
import org.apache.activemq.transport.http.BlockingQueueTransport;
import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.eclipse.jetty.security.ConstraintMapping; import org.eclipse.jetty.security.ConstraintMapping;
@ -71,7 +72,7 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
server = new Server(); server = new Server();
} }
try { try {
server.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(server, 60_000l); server.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(server, 30_000L);
} catch (Throwable t) { } catch (Throwable t) {
//ignore, jetty 8. //ignore, jetty 8.
} }

View File

@ -32,6 +32,7 @@ import jakarta.servlet.http.HttpServletResponse;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportAcceptListener;
@ -103,6 +104,11 @@ public class HttpTunnelServlet extends HttpServlet {
packet = (Command)transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS); packet = (Command)transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);
// If the packet is ShutDownInfo then we are shutting down so return.
if (packet instanceof ShutdownInfo) {
return;
}
DataOutputStream stream = new DataOutputStream(response.getOutputStream()); DataOutputStream stream = new DataOutputStream(response.getOutputStream());
wireFormat.marshal(packet, stream); wireFormat.marshal(packet, stream);
count++; count++;
@ -204,7 +210,17 @@ public class HttpTunnelServlet extends HttpServlet {
} }
public void stopped(Service service) { public void stopped(Service service) {
clients.remove(clientID); final BlockingQueueTransport removed = clients.remove(clientID);
if (removed != null) {
try {
// Send a ShutdownInfo() packet on stop so that we unblock any calls
// to transportChannel.getQueue().poll()
removed.getQueue().add(new ShutdownInfo());
} catch (Exception e) {
LOG.debug("Could not send ShutdownInfo() packet to BlockingQueueTransport "
+ "on shutdown for client {}", clientID);
}
}
} }
}); });