Issue #11358 - Add API to handle timeouts in WebSocket

Signed-off-by: Lachlan Roberts <lachlan.p.roberts@gmail.com>
This commit is contained in:
Lachlan Roberts 2024-10-09 23:41:28 +11:00
parent 0553fe3030
commit cb1a880dc4
No known key found for this signature in database
GPG Key ID: 5663FB7A8FF7E348
6 changed files with 205 additions and 29 deletions

View File

@ -19,9 +19,11 @@ import java.net.URI;
import java.nio.channels.ReadPendingException;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.exception.WebSocketTimeoutException;
/**
* Represents the outgoing Frames.
@ -191,6 +193,19 @@ public interface CoreSession extends OutgoingFrames, IncomingFrames, Configurati
*/
boolean isRsv3Used();
/**
* <p>Adds a listener for websocket timeouts.</p>
* <p>The listener is a predicate function that should return {@code true} to indicate
* that the timeout should be handled as a fatal failure or {@code false} to ignore
* that specific timeout and for another timeout to occur after another idle period.</p>
* <p>Listeners are processed in the same order they are added, and the first that
* returns {@code true} stops the processing of subsequent listeners, which are
* therefore not invoked.</p>
*
* @param onTimeout the idle timeout listener as a predicate function
*/
void addTimeoutListener(Predicate<WebSocketTimeoutException> onTimeout);
class Empty extends ConfigurationCustomizer implements CoreSession
{
@Override
@ -341,5 +356,10 @@ public interface CoreSession extends OutgoingFrames, IncomingFrames, Configurati
{
return false;
}
@Override
public void addTimeoutListener(Predicate<WebSocketTimeoutException> onTimeout)
{
}
}
}

View File

@ -225,21 +225,24 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
{
if (LOG.isDebugEnabled())
LOG.debug("onIdleExpired()");
// treat as a handler error because socket is still open
coreSession.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout", timeoutException), Callback.NOOP);
return true;
WebSocketTimeoutException exception = new WebSocketTimeoutException("Connection Idle Timeout", timeoutException);
boolean closeConnection = coreSession.onTimeout(exception);
if (closeConnection)
coreSession.processConnectionError(exception, Callback.NOOP);
return closeConnection;
}
@Override
protected boolean onReadTimeout(TimeoutException timeout)
protected boolean onReadTimeout(TimeoutException timeoutException)
{
if (LOG.isDebugEnabled())
LOG.debug("onReadTimeout()");
// treat as a handler error because socket is still open
coreSession.processHandlerError(new WebSocketTimeoutException("Timeout on Read", timeout), Callback.NOOP);
return false;
WebSocketTimeoutException exception = new WebSocketTimeoutException("Timeout on Read", timeoutException);
boolean closeConnection = coreSession.onTimeout(exception);
if (closeConnection)
coreSession.processConnectionError(exception, Callback.NOOP);
return closeConnection;
}
protected void onFrame(Frame.Parsed frame)

View File

@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
@ -71,6 +72,7 @@ public class WebSocketCoreSession implements CoreSession, Dumpable
private Duration idleTimeout = WebSocketConstants.DEFAULT_IDLE_TIMEOUT;
private Duration writeTimeout = WebSocketConstants.DEFAULT_WRITE_TIMEOUT;
private ClassLoader classLoader;
private Predicate<WebSocketTimeoutException> _onTimeout;
public WebSocketCoreSession(FrameHandler handler, Behavior behavior, Negotiated negotiated, WebSocketComponents components)
{
@ -304,24 +306,7 @@ public class WebSocketCoreSession implements CoreSession, Dumpable
else
code = CloseStatus.NO_CLOSE;
CloseStatus closeStatus = new CloseStatus(code, cause);
if (CloseStatus.isTransmittableStatusCode(code))
{
close(closeStatus, callback);
}
else
{
if (sessionState.onClosed(closeStatus))
{
closeConnection(closeStatus, callback);
}
else
{
// We are already closed because of a previous failure.
// Succeed because failing might re-enter this branch if it's the Frame callback.
callback.succeeded();
}
}
processError(new CloseStatus(code, cause), callback);
}
/**
@ -348,8 +333,12 @@ public class WebSocketCoreSession implements CoreSession, Dumpable
else
code = CloseStatus.SERVER_ERROR;
CloseStatus closeStatus = new CloseStatus(code, cause);
if (CloseStatus.isTransmittableStatusCode(code))
processError(new CloseStatus(code, cause), callback);
}
private void processError(CloseStatus closeStatus, Callback callback)
{
if (CloseStatus.isTransmittableStatusCode(closeStatus.getCode()))
{
close(closeStatus, callback);
}
@ -403,7 +392,7 @@ public class WebSocketCoreSession implements CoreSession, Dumpable
{
openCallback.failed(t);
/* This is double handling of the exception but we need to do this because we have two separate
/* This is double handling of the exception, but we need to do this because we have two separate
mechanisms for returning the CoreSession, onOpen and the CompletableFuture and both the onOpen callback
and the CompletableFuture require the exception. */
throw new RuntimeException(t);
@ -434,6 +423,36 @@ public class WebSocketCoreSession implements CoreSession, Dumpable
return getExtensionStack().isRsv3Used();
}
@Override
public void addTimeoutListener(Predicate<WebSocketTimeoutException> onTimeout)
{
if (_onTimeout == null)
{
_onTimeout = onTimeout;
}
else
{
Predicate<WebSocketTimeoutException> previous = _onTimeout;
_onTimeout = throwable ->
{
if (!previous.test(throwable))
return onTimeout.test(throwable);
return true;
};
}
}
/**
* @return true to let the EndPoint handle the timeout, false to tell the EndPoint to halt the handling of the timeout.
**/
public boolean onTimeout(WebSocketTimeoutException timeoutException)
{
if (_onTimeout == null)
return true;
else
return _onTimeout.test(timeoutException);
}
public WebSocketConnection getConnection()
{
return connection;

View File

@ -16,8 +16,10 @@ package org.eclipse.jetty.websocket.api;
import java.io.Closeable;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.function.Predicate;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException;
/**
* <p>{@link Session} represents an active link of
@ -201,6 +203,19 @@ public interface Session extends Configurable, Closeable
*/
boolean isSecure();
/**
* <p>Adds a listener for websocket timeouts.</p>
* <p>The listener is a predicate function that should return {@code true} to indicate
* that the timeout should be handled as a fatal failure or {@code false} to ignore
* that specific timeout and for another timeout to occur after another idle period.</p>
* <p>Listeners are processed in the same order they are added, and the first that
* returns {@code true} stops the processing of subsequent listeners, which are
* therefore not invoked.</p>
*
* @param onTimeout the idle timeout listener as a predicate function
*/
void addTimeoutListener(Predicate<WebSocketTimeoutException> onTimeout);
/**
* <p>The passive link of communication with a remote WebSocket endpoint.</p>
* <p>Applications provide WebSocket endpoints that implement this interface

View File

@ -18,6 +18,7 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Predicate;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.component.Dumpable;
@ -26,6 +27,7 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketContainer;
import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
@ -274,6 +276,12 @@ public class WebSocketSession implements Session, Dumpable
return upgradeRequest.isSecure();
}
@Override
public void addTimeoutListener(Predicate<WebSocketTimeoutException> onTimeout)
{
coreSession.addTimeoutListener(t -> onTimeout.test(new WebSocketTimeoutException(t)));
}
@Override
public void disconnect()
{

View File

@ -0,0 +1,111 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.eclipse.jetty.websocket.api.StatusCode.UNDEFINED;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class WebSocketIdleTimeoutTest
{
private final int IDLE_TIMEOUT = 1000;
private final AtomicBoolean _allowTimeout = new AtomicBoolean();
private Server _server;
private ServerConnector _connector;
private WebSocketClient _client;
private TimeoutEndpoint _serverEndpoint;
@BeforeEach
public void before() throws Exception
{
_server = new Server();
_connector = new ServerConnector(_server);
_server.addConnector(_connector);
WebSocketUpgradeHandler upgradeHandler = WebSocketUpgradeHandler.from(_server);
_serverEndpoint = new TimeoutEndpoint();
upgradeHandler.getServerWebSocketContainer().addMapping("/", (req, resp, cb) -> _serverEndpoint);
upgradeHandler.getServerWebSocketContainer().setIdleTimeout(Duration.ofMillis(IDLE_TIMEOUT));
_server.setHandler(upgradeHandler);
_server.start();
_client = new WebSocketClient();
_client.start();
}
@AfterEach
public void after() throws Exception
{
_client.stop();
_server.stop();
}
public class TimeoutEndpoint extends EventSocket
{
volatile CountDownLatch timeoutLatch;
public void awaitTimeouts(int num) throws InterruptedException
{
timeoutLatch = new CountDownLatch(num);
timeoutLatch.await();
}
@Override
public void onOpen(Session session)
{
session.addTimeoutListener(t ->
{
if (timeoutLatch != null)
timeoutLatch.countDown();
return _allowTimeout.get();
});
super.onOpen(session);
}
}
@Test
public void testWebSocketIdleTimeout() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
_client.connect(clientEndpoint, URI.create("ws://localhost:" + _connector.getLocalPort()));
assertTrue(_serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
// The WebSocket connection has not been closed but multiple timeout events have occurred.
_serverEndpoint.awaitTimeouts(3);
assertThat(_serverEndpoint.closeCode, equalTo(UNDEFINED));
assertThat(_serverEndpoint.closeLatch.getCount(), equalTo(1L));
// Allow the timeout listener to close the connection.
_allowTimeout.set(true);
assertTrue(_serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(_serverEndpoint.error, instanceOf(WebSocketTimeoutException.class));
}
}