Merge pull request #8164 from eclipse/jetty-10.0.x-8151-websocketClose
Issue #8151 - make websocket close non-blocking
This commit is contained in:
commit
13c61684fe
|
@ -13,7 +13,6 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.websocket.javax.common;
|
package org.eclipse.jetty.websocket.javax.common;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.security.Principal;
|
import java.security.Principal;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
@ -24,7 +23,6 @@ import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.websocket.CloseReason;
|
import javax.websocket.CloseReason;
|
||||||
import javax.websocket.EndpointConfig;
|
import javax.websocket.EndpointConfig;
|
||||||
|
@ -35,7 +33,7 @@ import javax.websocket.RemoteEndpoint.Basic;
|
||||||
import javax.websocket.Session;
|
import javax.websocket.Session;
|
||||||
import javax.websocket.WebSocketContainer;
|
import javax.websocket.WebSocketContainer;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.FutureCallback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.websocket.core.CoreSession;
|
import org.eclipse.jetty.websocket.core.CoreSession;
|
||||||
import org.eclipse.jetty.websocket.core.ExtensionConfig;
|
import org.eclipse.jetty.websocket.core.ExtensionConfig;
|
||||||
import org.eclipse.jetty.websocket.core.internal.util.ReflectUtils;
|
import org.eclipse.jetty.websocket.core.internal.util.ReflectUtils;
|
||||||
|
@ -190,13 +188,11 @@ public class JavaxWebSocketSession implements javax.websocket.Session
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
FutureCallback b = new FutureCallback();
|
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), Callback.NOOP);
|
||||||
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), b);
|
|
||||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
catch (IOException e)
|
catch (Throwable t)
|
||||||
{
|
{
|
||||||
LOG.trace("IGNORED", e);
|
LOG.trace("IGNORED", t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.websocket.javax.tests;
|
package org.eclipse.jetty.websocket.javax.tests;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -72,7 +73,8 @@ public class JavaxOnCloseTest
|
||||||
public void onClose(CloseReason reason)
|
public void onClose(CloseReason reason)
|
||||||
{
|
{
|
||||||
super.onClose(reason);
|
super.onClose(reason);
|
||||||
onClose.accept(session);
|
if (onClose != null)
|
||||||
|
onClose.accept(session);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,4 +228,36 @@ public class JavaxOnCloseTest
|
||||||
assertThat(clientEndpoint.error, instanceOf(RuntimeException.class));
|
assertThat(clientEndpoint.error, instanceOf(RuntimeException.class));
|
||||||
assertThat(clientEndpoint.error.getMessage(), containsString("trigger onError from client onClose"));
|
assertThat(clientEndpoint.error.getMessage(), containsString("trigger onError from client onClose"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCloseFromCallback() throws Exception
|
||||||
|
{
|
||||||
|
EventSocket clientEndpoint = new EventSocket();
|
||||||
|
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
|
||||||
|
client.connectToServer(clientEndpoint, uri);
|
||||||
|
|
||||||
|
OnCloseEndpoint serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
CountDownLatch closeSent = new CountDownLatch(1);
|
||||||
|
clientEndpoint.session.getAsyncRemote().sendText("GOODBYE", sendResult ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
clientEndpoint.session.close();
|
||||||
|
}
|
||||||
|
catch (IOException e)
|
||||||
|
{
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
closeSent.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertTrue(closeSent.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseCodes.NORMAL_CLOSURE));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,32 @@ public interface Session extends WebSocketPolicy, Closeable
|
||||||
*/
|
*/
|
||||||
void close(int statusCode, String reason);
|
void close(int statusCode, String reason);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a websocket Close frame, with status code.
|
||||||
|
* <p>
|
||||||
|
* This will enqueue a graceful close to the remote endpoint.
|
||||||
|
*
|
||||||
|
* @param statusCode the status code
|
||||||
|
* @param reason the (optional) reason. (can be null for no reason)
|
||||||
|
* @param callback the callback to track close frame sent (or failed)
|
||||||
|
* @see StatusCode
|
||||||
|
* @see #close()
|
||||||
|
* @see #close(CloseStatus)
|
||||||
|
* @see #disconnect()
|
||||||
|
*/
|
||||||
|
default void close(int statusCode, String reason, WriteCallback callback)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
close(statusCode, reason);
|
||||||
|
callback.writeSuccess();
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
callback.writeFailed(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Issue a harsh disconnect of the underlying connection.
|
* Issue a harsh disconnect of the underlying connection.
|
||||||
* <p>
|
* <p>
|
||||||
|
|
|
@ -20,7 +20,9 @@ package org.eclipse.jetty.websocket.api;
|
||||||
*/
|
*/
|
||||||
public interface WriteCallback
|
public interface WriteCallback
|
||||||
{
|
{
|
||||||
WriteCallback NOOP = new Adaptor();
|
WriteCallback NOOP = new WriteCallback()
|
||||||
|
{
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -44,6 +46,7 @@ public interface WriteCallback
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
class Adaptor implements WriteCallback
|
class Adaptor implements WriteCallback
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.FutureCallback;
|
import org.eclipse.jetty.util.FutureCallback;
|
||||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
|
||||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||||
import org.eclipse.jetty.websocket.core.CoreSession;
|
import org.eclipse.jetty.websocket.core.CoreSession;
|
||||||
import org.eclipse.jetty.websocket.core.Frame;
|
import org.eclipse.jetty.websocket.core.Frame;
|
||||||
|
@ -48,37 +47,6 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
|
||||||
this.batchMode = batchMode;
|
this.batchMode = batchMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Initiate close of the Remote with no status code (no payload)
|
|
||||||
*
|
|
||||||
* @since 10.0
|
|
||||||
*/
|
|
||||||
public void close()
|
|
||||||
{
|
|
||||||
close(StatusCode.NO_CODE, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initiate close of the Remote with specified status code and optional reason phrase
|
|
||||||
*
|
|
||||||
* @param statusCode the status code (must be valid and can be sent)
|
|
||||||
* @param reason optional reason code
|
|
||||||
* @since 10.0
|
|
||||||
*/
|
|
||||||
public void close(int statusCode, String reason)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
FutureCallback b = new FutureCallback();
|
|
||||||
coreSession.close(statusCode, reason, b);
|
|
||||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
catch (IOException e)
|
|
||||||
{
|
|
||||||
LOG.trace("IGNORED", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendString(String text) throws IOException
|
public void sendString(String text) throws IOException
|
||||||
{
|
{
|
||||||
|
|
|
@ -18,6 +18,7 @@ import java.net.SocketAddress;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.component.Dumpable;
|
import org.eclipse.jetty.util.component.Dumpable;
|
||||||
import org.eclipse.jetty.websocket.api.CloseStatus;
|
import org.eclipse.jetty.websocket.api.CloseStatus;
|
||||||
import org.eclipse.jetty.websocket.api.Session;
|
import org.eclipse.jetty.websocket.api.Session;
|
||||||
|
@ -27,6 +28,7 @@ import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||||
import org.eclipse.jetty.websocket.api.UpgradeResponse;
|
import org.eclipse.jetty.websocket.api.UpgradeResponse;
|
||||||
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
|
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
|
||||||
import org.eclipse.jetty.websocket.api.WebSocketContainer;
|
import org.eclipse.jetty.websocket.api.WebSocketContainer;
|
||||||
|
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||||
import org.eclipse.jetty.websocket.core.CoreSession;
|
import org.eclipse.jetty.websocket.core.CoreSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -53,19 +55,25 @@ public class WebSocketSession implements Session, SuspendToken, Dumpable
|
||||||
@Override
|
@Override
|
||||||
public void close()
|
public void close()
|
||||||
{
|
{
|
||||||
remoteEndpoint.close(StatusCode.NORMAL, null);
|
coreSession.close(StatusCode.NORMAL, null, Callback.NOOP);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close(CloseStatus closeStatus)
|
public void close(CloseStatus closeStatus)
|
||||||
{
|
{
|
||||||
remoteEndpoint.close(closeStatus.getCode(), closeStatus.getPhrase());
|
coreSession.close(closeStatus.getCode(), closeStatus.getPhrase(), Callback.NOOP);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close(int statusCode, String reason)
|
public void close(int statusCode, String reason)
|
||||||
{
|
{
|
||||||
remoteEndpoint.close(statusCode, reason);
|
coreSession.close(statusCode, reason, Callback.NOOP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(int statusCode, String reason, WriteCallback callback)
|
||||||
|
{
|
||||||
|
coreSession.close(statusCode, reason, Callback.from(callback::writeSuccess, callback::writeFailed));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue