Merge remote-tracking branch 'origin/jetty-11.0.x' into jetty-12.0.x

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2022-06-29 17:21:44 +10:00
parent 5c267ce7ed
commit ee02d163c3
12 changed files with 164 additions and 94 deletions

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.ee10.websocket.jakarta.common;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.time.Duration;
@ -24,7 +23,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import jakarta.websocket.CloseReason;
@ -37,7 +35,7 @@ import jakarta.websocket.Session;
import jakarta.websocket.WebSocketContainer;
import org.eclipse.jetty.ee10.websocket.jakarta.common.decoders.AvailableDecoders;
import org.eclipse.jetty.ee10.websocket.jakarta.common.encoders.AvailableEncoders;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.internal.util.ReflectUtils;
@ -190,13 +188,11 @@ public class JakartaWebSocketSession implements jakarta.websocket.Session
{
try
{
FutureCallback b = new FutureCallback();
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), Callback.NOOP);
}
catch (IOException e)
catch (Throwable t)
{
LOG.trace("IGNORED", e);
LOG.trace("IGNORED", t);
}
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.ee10.websocket.jakarta.tests;
import java.io.IOException;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
@ -72,7 +73,8 @@ public class JakartaOnCloseTest
public void onClose(CloseReason reason)
{
super.onClose(reason);
onClose.accept(session);
if (onClose != null)
onClose.accept(session);
}
}
@ -136,7 +138,7 @@ public class JakartaOnCloseTest
OnCloseEndpoint serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) -> assertDoesNotThrow(() ->
session.close(new CloseReason(CloseCodes.SERVICE_RESTART, "custom close reason"))));
session.close(new CloseReason(CloseCodes.SERVICE_RESTART, "custom close reason"))));
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
clientEndpoint.session.close();
@ -225,4 +227,36 @@ public class JakartaOnCloseTest
assertThat(clientEndpoint.error, instanceOf(RuntimeException.class));
assertThat(clientEndpoint.error.getMessage(), containsString("trigger onError from client onClose"));
}
@Test
public void testCloseFromCallback() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connectToServer(clientEndpoint, uri);
OnCloseEndpoint serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
CountDownLatch closeSent = new CountDownLatch(1);
clientEndpoint.session.getAsyncRemote().sendText("GOODBYE", sendResult ->
{
try
{
clientEndpoint.session.close();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
finally
{
closeSent.countDown();
}
});
assertTrue(closeSent.await(5, TimeUnit.SECONDS));
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseCodes.NORMAL_CLOSURE));
}
}

View File

@ -63,6 +63,32 @@ public interface Session extends WebSocketPolicy, Closeable
*/
void close(int statusCode, String reason);
/**
* Send a websocket Close frame, with status code.
* <p>
* This will enqueue a graceful close to the remote endpoint.
*
* @param statusCode the status code
* @param reason the (optional) reason. (can be null for no reason)
* @param callback the callback to track close frame sent (or failed)
* @see StatusCode
* @see #close()
* @see #close(CloseStatus)
* @see #disconnect()
*/
default void close(int statusCode, String reason, WriteCallback callback)
{
try
{
close(statusCode, reason);
callback.writeSuccess();
}
catch (Throwable t)
{
callback.writeFailed(t);
}
}
/**
* Issue a harsh disconnect of the underlying connection.
* <p>

View File

@ -20,7 +20,9 @@ package org.eclipse.jetty.ee10.websocket.api;
*/
public interface WriteCallback
{
WriteCallback NOOP = new Adaptor();
WriteCallback NOOP = new WriteCallback()
{
};
/**
* <p>
@ -44,6 +46,7 @@ public interface WriteCallback
{
}
@Deprecated
class Adaptor implements WriteCallback
{
@Override

View File

@ -20,7 +20,6 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.ee10.websocket.api.BatchMode;
import org.eclipse.jetty.ee10.websocket.api.StatusCode;
import org.eclipse.jetty.ee10.websocket.api.WriteCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -48,37 +47,6 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.ee10.webs
this.batchMode = batchMode;
}
/**
* Initiate close of the Remote with no status code (no payload)
*
* @since 10.0
*/
public void close()
{
close(StatusCode.NO_CODE, null);
}
/**
* Initiate close of the Remote with specified status code and optional reason phrase
*
* @param statusCode the status code (must be valid and can be sent)
* @param reason optional reason code
* @since 10.0
*/
public void close(int statusCode, String reason)
{
try
{
FutureCallback b = new FutureCallback();
coreSession.close(statusCode, reason, b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
catch (IOException e)
{
LOG.trace("IGNORED", e);
}
}
@Override
public void sendString(String text) throws IOException
{
@ -225,7 +193,7 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.ee10.webs
}
@Override
public org.eclipse.jetty.ee10.websocket.api.BatchMode getBatchMode()
public BatchMode getBatchMode()
{
return batchMode;
}

View File

@ -26,6 +26,8 @@ import org.eclipse.jetty.ee10.websocket.api.UpgradeRequest;
import org.eclipse.jetty.ee10.websocket.api.UpgradeResponse;
import org.eclipse.jetty.ee10.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.ee10.websocket.api.WebSocketContainer;
import org.eclipse.jetty.ee10.websocket.api.WriteCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.slf4j.Logger;
@ -53,19 +55,25 @@ public class WebSocketSession implements Session, SuspendToken, Dumpable
@Override
public void close()
{
remoteEndpoint.close(StatusCode.NORMAL, null);
coreSession.close(StatusCode.NORMAL, null, Callback.NOOP);
}
@Override
public void close(CloseStatus closeStatus)
{
remoteEndpoint.close(closeStatus.getCode(), closeStatus.getPhrase());
coreSession.close(closeStatus.getCode(), closeStatus.getPhrase(), Callback.NOOP);
}
@Override
public void close(int statusCode, String reason)
{
remoteEndpoint.close(statusCode, reason);
coreSession.close(statusCode, reason, Callback.NOOP);
}
@Override
public void close(int statusCode, String reason, WriteCallback callback)
{
coreSession.close(statusCode, reason, Callback.from(callback::writeSuccess, callback::writeFailed));
}
@Override

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.ee9.websocket.jakarta.common;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.time.Duration;
@ -24,7 +23,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import jakarta.websocket.CloseReason;
@ -37,7 +35,7 @@ import jakarta.websocket.Session;
import jakarta.websocket.WebSocketContainer;
import org.eclipse.jetty.ee9.websocket.jakarta.common.decoders.AvailableDecoders;
import org.eclipse.jetty.ee9.websocket.jakarta.common.encoders.AvailableEncoders;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.internal.util.ReflectUtils;
@ -190,13 +188,11 @@ public class JakartaWebSocketSession implements jakarta.websocket.Session
{
try
{
FutureCallback b = new FutureCallback();
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), Callback.NOOP);
}
catch (IOException e)
catch (Throwable t)
{
LOG.trace("IGNORED", e);
LOG.trace("IGNORED", t);
}
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.ee9.websocket.jakarta.tests;
import java.io.IOException;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
@ -72,7 +73,8 @@ public class JakartaOnCloseTest
public void onClose(CloseReason reason)
{
super.onClose(reason);
onClose.accept(session);
if (onClose != null)
onClose.accept(session);
}
}
@ -136,7 +138,7 @@ public class JakartaOnCloseTest
OnCloseEndpoint serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
serverEndpoint.setOnClose((session) -> assertDoesNotThrow(() ->
session.close(new CloseReason(CloseCodes.SERVICE_RESTART, "custom close reason"))));
session.close(new CloseReason(CloseCodes.SERVICE_RESTART, "custom close reason"))));
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
clientEndpoint.session.close();
@ -225,4 +227,36 @@ public class JakartaOnCloseTest
assertThat(clientEndpoint.error, instanceOf(RuntimeException.class));
assertThat(clientEndpoint.error.getMessage(), containsString("trigger onError from client onClose"));
}
@Test
public void testCloseFromCallback() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
client.connectToServer(clientEndpoint, uri);
OnCloseEndpoint serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));
assertTrue(serverEndpoint.openLatch.await(5, TimeUnit.SECONDS));
CountDownLatch closeSent = new CountDownLatch(1);
clientEndpoint.session.getAsyncRemote().sendText("GOODBYE", sendResult ->
{
try
{
clientEndpoint.session.close();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
finally
{
closeSent.countDown();
}
});
assertTrue(closeSent.await(5, TimeUnit.SECONDS));
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseCodes.NORMAL_CLOSURE));
}
}

View File

@ -63,6 +63,32 @@ public interface Session extends WebSocketPolicy, Closeable
*/
void close(int statusCode, String reason);
/**
* Send a websocket Close frame, with status code.
* <p>
* This will enqueue a graceful close to the remote endpoint.
*
* @param statusCode the status code
* @param reason the (optional) reason. (can be null for no reason)
* @param callback the callback to track close frame sent (or failed)
* @see StatusCode
* @see #close()
* @see #close(CloseStatus)
* @see #disconnect()
*/
default void close(int statusCode, String reason, WriteCallback callback)
{
try
{
close(statusCode, reason);
callback.writeSuccess();
}
catch (Throwable t)
{
callback.writeFailed(t);
}
}
/**
* Issue a harsh disconnect of the underlying connection.
* <p>

View File

@ -20,7 +20,9 @@ package org.eclipse.jetty.ee9.websocket.api;
*/
public interface WriteCallback
{
WriteCallback NOOP = new Adaptor();
WriteCallback NOOP = new WriteCallback()
{
};
/**
* <p>
@ -44,6 +46,7 @@ public interface WriteCallback
{
}
@Deprecated
class Adaptor implements WriteCallback
{
@Override

View File

@ -20,7 +20,6 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.ee9.websocket.api.BatchMode;
import org.eclipse.jetty.ee9.websocket.api.StatusCode;
import org.eclipse.jetty.ee9.websocket.api.WriteCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -48,37 +47,6 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.ee9.webso
this.batchMode = batchMode;
}
/**
* Initiate close of the Remote with no status code (no payload)
*
* @since 10.0
*/
public void close()
{
close(StatusCode.NO_CODE, null);
}
/**
* Initiate close of the Remote with specified status code and optional reason phrase
*
* @param statusCode the status code (must be valid and can be sent)
* @param reason optional reason code
* @since 10.0
*/
public void close(int statusCode, String reason)
{
try
{
FutureCallback b = new FutureCallback();
coreSession.close(statusCode, reason, b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
catch (IOException e)
{
LOG.trace("IGNORED", e);
}
}
@Override
public void sendString(String text) throws IOException
{
@ -225,7 +193,7 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.ee9.webso
}
@Override
public org.eclipse.jetty.ee9.websocket.api.BatchMode getBatchMode()
public BatchMode getBatchMode()
{
return batchMode;
}

View File

@ -26,6 +26,8 @@ import org.eclipse.jetty.ee9.websocket.api.UpgradeRequest;
import org.eclipse.jetty.ee9.websocket.api.UpgradeResponse;
import org.eclipse.jetty.ee9.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.ee9.websocket.api.WebSocketContainer;
import org.eclipse.jetty.ee9.websocket.api.WriteCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.slf4j.Logger;
@ -53,19 +55,25 @@ public class WebSocketSession implements Session, SuspendToken, Dumpable
@Override
public void close()
{
remoteEndpoint.close(StatusCode.NORMAL, null);
coreSession.close(StatusCode.NORMAL, null, Callback.NOOP);
}
@Override
public void close(CloseStatus closeStatus)
{
remoteEndpoint.close(closeStatus.getCode(), closeStatus.getPhrase());
coreSession.close(closeStatus.getCode(), closeStatus.getPhrase(), Callback.NOOP);
}
@Override
public void close(int statusCode, String reason)
{
remoteEndpoint.close(statusCode, reason);
coreSession.close(statusCode, reason, Callback.NOOP);
}
@Override
public void close(int statusCode, String reason, WriteCallback callback)
{
coreSession.close(statusCode, reason, Callback.from(callback::writeSuccess, callback::writeFailed));
}
@Override