Issue #3290 async onOpen, onClose and onError

Changes after review:
 + removed Adaptor from FrameHandler, so all non test usages of
   FrameHandler now use async API.
 + Fixed sequencing of multiple async operation so callback is notified
   after completion (created more Callback.from utilities for this)
 + fixed import order

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-01-29 10:33:37 +11:00
parent 1171d7c8d7
commit 84d74ba1de
31 changed files with 768 additions and 582 deletions

View File

@ -18,20 +18,6 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import org.eclipse.jetty.http.*;
import org.eclipse.jetty.io.*;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -46,6 +32,33 @@ import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
/** /**
* HttpChannel represents a single endpoint for HTTP semantic processing. * HttpChannel represents a single endpoint for HTTP semantic processing.
* The HttpChannel is both a HttpParser.RequestHandler, where it passively receives events from * The HttpChannel is both a HttpParser.RequestHandler, where it passively receives events from
@ -561,8 +574,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
*/ */
protected void handleException(Throwable failure) protected void handleException(Throwable failure)
{ {
LOG.warn(failure);
// Unwrap wrapping Jetty and Servlet exceptions. // Unwrap wrapping Jetty and Servlet exceptions.
Throwable quiet = unwrap(failure, QuietException.class); Throwable quiet = unwrap(failure, QuietException.class);
Throwable no_stack = unwrap(failure, BadMessageException.class, IOException.class, TimeoutException.class); Throwable no_stack = unwrap(failure, BadMessageException.class, IOException.class, TimeoutException.class);

View File

@ -111,6 +111,12 @@ public interface Callback extends Invocable
}; };
} }
/**
* Create a callback from the passed success and failure
* @param success Called when the callback succeeds
* @param failure Called when the callback fails
* @return a new Callback
*/
static Callback from(Runnable success, Consumer<Throwable> failure) static Callback from(Runnable success, Consumer<Throwable> failure)
{ {
return new Callback() return new Callback()
@ -129,6 +135,10 @@ public interface Callback extends Invocable
}; };
} }
/** Creaste a callback that runs completed when it succeeds or fails
* @param completed The completion to run on success or failure
* @return a new callback
*/
static Callback from(Runnable completed) static Callback from(Runnable completed)
{ {
return new Completing() return new Completing()
@ -140,6 +150,67 @@ public interface Callback extends Invocable
}; };
} }
/**
* Create a nested callback that runs completed after
* completing the nested callback.
* @param callback The nested callback
* @param completed The completion to run after the nested callback is completed
* @return a new callback.
*/
static Callback from(Callback callback, Runnable completed)
{
return new Nested(callback)
{
public void completed()
{
completed.run();
}
};
}
/**
* Create a nested callback that runs completed before
* completing the nested callback.
* @param callback The nested callback
* @param completed The completion to run before the nested callback is completed. Any exceptions thrown
* from completed will result in a callback failure.
* @return a new callback.
*/
static Callback from(Runnable completed, Callback callback)
{
return new Callback()
{
@Override
public void succeeded()
{
try
{
completed.run();
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
@Override
public void failed(Throwable x)
{
try
{
completed.run();
}
catch(Throwable t)
{
x.addSuppressed(t);
}
callback.failed(x);
}
};
}
class Completing implements Callback class Completing implements Callback
{ {
@Override @Override
@ -158,7 +229,11 @@ public interface Callback extends Invocable
{ {
} }
} }
/**
* Nested Completing Callback that completes after
* completing the nested callback
*/
class Nested extends Completing class Nested extends Completing
{ {
private final Callback callback; private final Callback callback;

View File

@ -18,15 +18,6 @@
package org.eclipse.jetty.websocket.javax.client; package org.eclipse.jetty.websocket.javax.client;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.javax.common.*;
import javax.websocket.*;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Objects; import java.util.Objects;
@ -34,8 +25,29 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier; import java.util.function.Supplier;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.DeploymentException;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Extension;
import javax.websocket.Session;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint;
import org.eclipse.jetty.websocket.javax.common.InvalidWebSocketException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketExtensionConfig;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory;
/** /**
* Container for Client use of the javax.websocket API. * Container for Client use of the javax.websocket API.
* <p> * <p>
@ -161,12 +173,15 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple
try try
{ {
Future<Session> sessionFuture = connect(upgradeRequest); Future<Session> sessionFuture = connect(upgradeRequest);
long timeout = getDefaultMaxSessionIdleTimeout(); long timeout = coreClient.getHttpClient().getConnectTimeout();
if (timeout>0) if (timeout>0)
return sessionFuture.get(timeout, TimeUnit.MILLISECONDS); return sessionFuture.get(timeout+1000, TimeUnit.MILLISECONDS);
return sessionFuture.get(); return sessionFuture.get();
} }
catch (TimeoutException e)
{
throw new IOException("Connection future not completed " + destURI, e);
}
catch (Exception e) catch (Exception e)
{ {
throw new IOException("Unable to connect to " + destURI, e); throw new IOException("Unable to connect to " + destURI, e);

View File

@ -18,26 +18,47 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.*;
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.javax.common.messages.*;
import org.eclipse.jetty.websocket.javax.common.util.InvokerUtils;
import javax.websocket.MessageHandler;
import javax.websocket.*;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType; import java.lang.invoke.MethodType;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class JavaxWebSocketFrameHandler implements FrameHandler.Adaptor import javax.websocket.CloseReason;
import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.PongMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryStreamMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedTextMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedTextStreamMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.PartialByteArrayMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.PartialByteBufferMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.PartialStringMessageSink;
import org.eclipse.jetty.websocket.javax.common.util.InvokerUtils;
public class JavaxWebSocketFrameHandler implements FrameHandler
{ {
private final Logger LOG; private final Logger LOG;
private final JavaxWebSocketContainer container; private final JavaxWebSocketContainer container;
@ -171,106 +192,57 @@ public class JavaxWebSocketFrameHandler implements FrameHandler.Adaptor
} }
@Override @Override
public void onClosed(CloseStatus closeStatus) public void onOpen(CoreSession coreSession, Callback callback)
{ {
if (closeHandle != null)
{
try
{
CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
closeHandle.invoke(closeReason);
}
catch (Throwable cause)
{
throw new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause);
}
}
container.removeBean(session);
}
@SuppressWarnings("Duplicates")
@Override
public void onError(Throwable cause)
{
futureSession.completeExceptionally(cause);
if (errorHandle == null)
{
LOG.warn("Unhandled Error: " + endpointInstance, cause);
return;
}
try try
{ {
errorHandle.invoke(cause); this.coreSession = coreSession;
} session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);
catch (Throwable t)
{
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
throw wsError;
}
}
@Override openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig);
public void onOpen(CoreSession coreSession) throws Exception closeHandle = InvokerUtils.bindTo(closeHandle, session);
{ errorHandle = InvokerUtils.bindTo(errorHandle, session);
this.coreSession = coreSession;
session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);
openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig); JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata);
closeHandle = InvokerUtils.bindTo(closeHandle, session); JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata);
errorHandle = InvokerUtils.bindTo(errorHandle, session);
JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata); pongHandle = InvokerUtils.bindTo(pongHandle, session);
JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata);
pongHandle = InvokerUtils.bindTo(pongHandle, session); if (actualTextMetadata != null)
if (actualTextMetadata != null)
{
actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session);
actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session);
textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata);
textMetadata = actualTextMetadata;
}
if (actualBinaryMetadata != null)
{
actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session);
actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session);
binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata);
binaryMetadata = actualBinaryMetadata;
}
if (openHandle != null)
{
try
{ {
actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session);
actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session);
textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata);
textMetadata = actualTextMetadata;
}
if (actualBinaryMetadata != null)
{
actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session);
actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session);
binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata);
binaryMetadata = actualBinaryMetadata;
}
if (openHandle != null)
openHandle.invoke(); openHandle.invoke();
}
catch (Throwable cause)
{
Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
// TODO This feels like double handling of the exception? Review need for futureSession container.addBean(session, true);
futureSession.completeExceptionally(wse); futureSession.complete(session);
throw wse; callback.succeeded();
}
} }
catch (Throwable cause)
{
Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
container.addBean(session, true); // TODO This feels like double handling of the exception? Review need for futureSession
futureSession.complete(session); futureSession.completeExceptionally(wse);
callback.failed(wse);
}
} }
/**
* @see #onFrame(Frame,Callback)
*/
public final void onFrame(Frame frame) {}
@Override @Override
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {
@ -302,6 +274,50 @@ public class JavaxWebSocketFrameHandler implements FrameHandler.Adaptor
dataType = OpCode.UNDEFINED; dataType = OpCode.UNDEFINED;
} }
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
try
{
if (closeHandle != null)
{
CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
closeHandle.invoke(closeReason);
}
container.removeBean(session);
callback.succeeded();
}
catch (Throwable cause)
{
callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause));
}
}
@Override
public void onError(Throwable cause, Callback callback)
{
try
{
futureSession.completeExceptionally(cause);
if (errorHandle != null)
errorHandle.invoke(cause);
else
LOG.warn("Unhandled Error: " + endpointInstance, cause);
callback.succeeded();
}
catch (Throwable t)
{
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
callback.failed(wsError);
// TODO should futureSession be failed here?
}
}
public Set<MessageHandler> getMessageHandlers() public Set<MessageHandler> getMessageHandlers()
{ {
if (messageHandlerMap.isEmpty()) if (messageHandlerMap.isEmpty())

View File

@ -220,10 +220,10 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
{ {
getBasicRemote().sendObject(obj); getBasicRemote().sendObject(obj);
} }
catch (Throwable cause) catch (Exception cause)
{ {
// TODO: need way to fail Channel. // TODO review this
frameHandler.onError(cause); throw new RuntimeException(cause);
} }
} }
} }

View File

@ -18,6 +18,13 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.Session;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
@ -25,12 +32,6 @@ import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.Session;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -44,12 +45,12 @@ public class JavaxWebSocketFrameHandler_OnCloseTest extends AbstractJavaxWebSock
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// These invocations are the same for all tests // These invocations are the same for all tests
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
CloseStatus status = new CloseStatus(CloseStatus.NORMAL, "Normal"); CloseStatus status = new CloseStatus(CloseStatus.NORMAL, "Normal");
Frame closeFrame = status.toFrame(); Frame closeFrame = status.toFrame();
localEndpoint.onFrame(closeFrame, Callback.from(() -> localEndpoint.onFrame(closeFrame, Callback.from(() ->
{ {
localEndpoint.onClosed(status); localEndpoint.onClosed(status, Callback.NOOP);
}, t -> }, t ->
{ {
throw new RuntimeException(t); throw new RuntimeException(t);

View File

@ -18,14 +18,16 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; import java.util.concurrent.TimeUnit;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint; import javax.websocket.ClientEndpoint;
import javax.websocket.OnError; import javax.websocket.OnError;
import javax.websocket.Session; import javax.websocket.Session;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
@ -40,8 +42,8 @@ public class JavaxWebSocketFrameHandler_OnErrorTest extends AbstractJavaxWebSock
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// These invocations are the same for all tests // These invocations are the same for all tests
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onError(new RuntimeException("From Testcase")); localEndpoint.onError(new RuntimeException("From Testcase"), Callback.NOOP);
String event = socket.events.poll(1, TimeUnit.SECONDS); String event = socket.events.poll(1, TimeUnit.SECONDS);
assertThat("Event", event, eventMatcher); assertThat("Event", event, eventMatcher);
} }

View File

@ -18,6 +18,15 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
@ -25,14 +34,6 @@ import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -44,7 +45,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest extends Abstr
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests // This invocation is the same for all tests
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
func.apply(localEndpoint); func.apply(localEndpoint);

View File

@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
@ -27,13 +35,6 @@ import org.eclipse.jetty.websocket.javax.common.util.InvalidSignatureException;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
@ -47,7 +48,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_BinaryTest extends AbstractJav
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests // This invocation is the same for all tests
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
assertThat("Has Binary Metadata", localEndpoint.getBinaryMetadata(), notNullValue()); assertThat("Has Binary Metadata", localEndpoint.getBinaryMetadata(), notNullValue());

View File

@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.io.Reader;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
@ -25,13 +33,6 @@ import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import java.io.IOException;
import java.io.Reader;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -43,7 +44,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextStreamTest extends Abstrac
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests // This invocation is the same for all tests
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
func.apply(localEndpoint); func.apply(localEndpoint);

View File

@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
@ -27,13 +35,6 @@ import org.eclipse.jetty.websocket.javax.common.util.InvalidSignatureException;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -46,7 +47,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextTest extends AbstractJavax
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests // This invocation is the same for all tests
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
ByteBuffer payload = BufferUtil.toBuffer(msg, StandardCharsets.UTF_8); ByteBuffer payload = BufferUtil.toBuffer(msg, StandardCharsets.UTF_8);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload(payload).setFin(true), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload(payload).setFin(true), Callback.NOOP);

View File

@ -18,14 +18,16 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; import java.util.concurrent.TimeUnit;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint; import javax.websocket.ClientEndpoint;
import javax.websocket.OnOpen; import javax.websocket.OnOpen;
import javax.websocket.Session; import javax.websocket.Session;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
@ -38,7 +40,7 @@ public class JavaxWebSocketFrameHandler_OnOpenTest extends AbstractJavaxWebSocke
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests // This invocation is the same for all tests
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
String event = socket.events.poll(1, TimeUnit.SECONDS); String event = socket.events.poll(1, TimeUnit.SECONDS);
assertThat("Event", event, eventMatcher); assertThat("Event", event, eventMatcher);
} }

View File

@ -18,6 +18,16 @@
package org.eclipse.jetty.websocket.javax.tests; package org.eclipse.jetty.websocket.javax.tests;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpResponse; import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
@ -32,16 +42,6 @@ import org.eclipse.jetty.websocket.core.client.UpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.internal.Generator; import org.eclipse.jetty.websocket.core.internal.Generator;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable
{ {
private final LocalServer server; private final LocalServer server;
@ -217,7 +217,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
} }
} }
public static class FrameCapture implements FrameHandler.Adaptor public static class FrameCapture implements FrameHandler
{ {
private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>(); private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>();
private final EndPoint endPoint; private final EndPoint endPoint;
@ -229,32 +229,34 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
this.endPoint = endPoint; this.endPoint = endPoint;
} }
@Override
public void onClosed(CloseStatus closeStatus)
{
}
@Override @Override
public void onError(Throwable cause) throws Exception public void onOpen(CoreSession coreSession, Callback callback)
{ {
this.session = coreSession;
callback.succeeded();
} }
@Override @Override
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {
receivedFrames.offer(Frame.copy(frame)); receivedFrames.offer(Frame.copy(frame));
synchronized(this) callback.succeeded();
{
callback.succeeded();
}
} }
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onError(Throwable cause, Callback callback)
{ {
this.session = coreSession; callback.succeeded();
} }
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
callback.succeeded();
}
public void writeRaw(ByteBuffer buffer) throws IOException public void writeRaw(ByteBuffer buffer) throws IOException
{ {
synchronized (this) synchronized (this)

View File

@ -25,16 +25,17 @@ import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.FrameHandler;
public class FrameEcho implements FrameHandler.Adaptor public class FrameEcho implements FrameHandler
{ {
private Logger LOG = Log.getLogger(FrameEcho.class); private Logger LOG = Log.getLogger(FrameEcho.class);
private CoreSession coreSession; private CoreSession coreSession;
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onOpen(CoreSession coreSession, Callback callback)
{ {
this.coreSession = coreSession; this.coreSession = coreSession;
callback.succeeded();
} }
@Override @Override
@ -47,15 +48,18 @@ public class FrameEcho implements FrameHandler.Adaptor
} }
@Override @Override
public void onClosed(CloseStatus closeStatus) public void onError(Throwable cause, Callback callback)
{
coreSession = null;
}
@Override
public void onError(Throwable cause) throws Exception
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(this + " onError ", cause); LOG.debug(this + " onError ", cause);
callback.succeeded();
} }
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
coreSession = null;
callback.succeeded();
}
} }

View File

@ -18,17 +18,17 @@
package org.eclipse.jetty.websocket.javax.tests.framehandlers; package org.eclipse.jetty.websocket.javax.tests.framehandlers;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.MessageHandler;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.MessageHandler;
public class FrameHandlerTracker extends MessageHandler public class FrameHandlerTracker extends MessageHandler
{ {
public CountDownLatch openLatch = new CountDownLatch(1); public CountDownLatch openLatch = new CountDownLatch(1);
@ -45,26 +45,9 @@ public class FrameHandlerTracker extends MessageHandler
} }
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onOpen(CoreSession coreSession, Callback callback)
{ {
super.onOpen(coreSession); super.onOpen(coreSession, Callback.from(callback,()->openLatch.countDown()));
openLatch.countDown();
}
@Override
public void onClosed(CloseStatus closeStatus) throws Exception
{
super.onClosed(closeStatus);
closeDetail.compareAndSet(null, closeStatus);
closeLatch.countDown();
}
@Override
public void onError(Throwable cause) throws Exception
{
super.onError(cause);
error.compareAndSet(null, cause);
} }
@Override @Override
@ -80,4 +63,21 @@ public class FrameHandlerTracker extends MessageHandler
bufferQueue.offer(BufferUtil.copy(wholeMessage)); bufferQueue.offer(BufferUtil.copy(wholeMessage));
callback.succeeded(); callback.succeeded();
} }
@Override
public void onError(Throwable cause, Callback callback)
{
super.onError(cause, Callback.from(callback, ()-> error.compareAndSet(null, cause)));
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
super.onClosed(closeStatus, Callback.from(callback,()->
{
closeDetail.compareAndSet(null, closeStatus);
closeLatch.countDown();
}));
}
} }

View File

@ -18,6 +18,23 @@
package org.eclipse.jetty.websocket.javax.tests.client; package org.eclipse.jetty.websocket.javax.tests.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.component.LifeCycle;
@ -28,31 +45,15 @@ import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.MessageHandler; import org.eclipse.jetty.websocket.core.MessageHandler;
import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.server.Negotiation; import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.javax.common.util.TextUtil;
import org.eclipse.jetty.websocket.javax.tests.CoreServer; import org.eclipse.jetty.websocket.javax.tests.CoreServer;
import org.eclipse.jetty.websocket.javax.tests.DataUtils; import org.eclipse.jetty.websocket.javax.tests.DataUtils;
import org.eclipse.jetty.websocket.javax.common.util.TextUtil;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -292,9 +293,10 @@ public class MessageReceivingTest
} }
@Override @Override
public void onError(Throwable cause) public void onError(Throwable cause, Callback callback)
{ {
LOG.warn(cause); LOG.warn(cause);
callback.succeeded();
} }
} }
@ -323,9 +325,10 @@ public class MessageReceivingTest
} }
@Override @Override
public void onError(Throwable cause) public void onError(Throwable cause, Callback callback)
{ {
LOG.warn(cause); LOG.warn(cause);
callback.succeeded();
} }
} }

View File

@ -30,19 +30,19 @@ import javax.websocket.Session;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker; import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket; import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSocket;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig; import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSessionSocket; import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSessionSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionReasonSocket; import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionReasonSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionSocket; import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSocket;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
@ -112,11 +112,11 @@ public class OnCloseTest
CompletableFuture<Session> futureSession = new CompletableFuture<>(); CompletableFuture<Session> futureSession = new CompletableFuture<>();
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(endpoint, request, response, futureSession); JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(endpoint, request, response, futureSession);
frameHandler.onOpen(new FrameHandler.CoreSession.Empty()); frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
// Execute onClose call // Execute onClose call
frameHandler.onFrame(CloseStatus.toFrame(CloseStatus.NORMAL), Callback.NOOP); frameHandler.onFrame(CloseStatus.toFrame(CloseStatus.NORMAL), Callback.NOOP);
frameHandler.onClosed(CloseStatus.NORMAL_STATUS); frameHandler.onClosed(CloseStatus.NORMAL_STATUS, Callback.NOOP);
// Test captured event // Test captured event
BlockingQueue<String> events = endpoint.events; BlockingQueue<String> events = endpoint.events;

View File

@ -33,12 +33,9 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.tests.MessageType; import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.javax.tests.SessionMatchers; import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler; import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientFrameHandlerFactory;
import org.eclipse.jetty.websocket.javax.tests.handlers.ByteBufferPartialHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.LongMessageHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.StringWholeHandler;
import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint; import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory;
@ -47,20 +44,20 @@ import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig; import org.eclipse.jetty.websocket.javax.tests.MessageType;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer; import org.eclipse.jetty.websocket.javax.tests.SessionMatchers;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientFrameHandlerFactory; import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.ByteBufferPartialHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.LongMessageHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.StringWholeHandler;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
public class SessionAddMessageHandlerTest public class SessionAddMessageHandlerTest
{ {
@ -88,7 +85,7 @@ public class SessionAddMessageHandlerTest
JavaxWebSocketFrameHandlerFactory frameHandlerFactory = new JavaxWebSocketClientFrameHandlerFactory(container); JavaxWebSocketFrameHandlerFactory frameHandlerFactory = new JavaxWebSocketClientFrameHandlerFactory(container);
CompletableFuture<Session> futureSession = new CompletableFuture<>(); CompletableFuture<Session> futureSession = new CompletableFuture<>();
frameHandler = frameHandlerFactory.newJavaxFrameHandler(ei, handshakeRequest, handshakeResponse, futureSession); frameHandler = frameHandlerFactory.newJavaxFrameHandler(ei, handshakeRequest, handshakeResponse, futureSession);
frameHandler.onOpen(new FrameHandler.CoreSession.Empty()); frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
// Session // Session
session = frameHandler.getSession(); session = frameHandler.getSession();

View File

@ -35,12 +35,12 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -57,7 +57,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextStreamTest extends Abstrac
// Establish endpoint function // Establish endpoint function
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request, response, futureSession); JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request, response, futureSession);
frameHandler.onOpen(new FrameHandler.CoreSession.Empty()); frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
func.accept(frameHandler); func.accept(frameHandler);
return socket; return socket;
} }

View File

@ -18,6 +18,11 @@
package org.eclipse.jetty.websocket.common; package org.eclipse.jetty.websocket.common;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -26,14 +31,19 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.common.invoke.InvalidSignatureException; import org.eclipse.jetty.websocket.common.invoke.InvalidSignatureException;
import org.eclipse.jetty.websocket.core.*; import org.eclipse.jetty.websocket.core.BadPayloadException;
import org.eclipse.jetty.websocket.core.CloseException;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.UpgradeException;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import java.lang.invoke.MethodHandle; public class JettyWebSocketFrameHandler implements FrameHandler
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
{ {
private final Logger log; private final Logger log;
private final Executor executor; private final Executor executor;
@ -104,63 +114,43 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
} }
@Override @Override
public void onClosed(CloseStatus closeStatus) public void onOpen(CoreSession coreSession, Callback callback)
{ {
}
@SuppressWarnings("Duplicates")
@Override
public void onError(Throwable cause)
{
cause = convertCause(cause);
futureSession.completeExceptionally(cause);
if (errorHandle == null)
{
log.warn("Unhandled Error: Endpoint " + endpointInstance.getClass().getName() + " : " + cause);
if (log.isDebugEnabled())
log.debug("unhandled", cause);
return;
}
try try
{ {
errorHandle.invoke(cause); customizer.customize(coreSession);
session = new WebSocketSessionImpl(coreSession, this, upgradeRequest, upgradeResponse);
frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session);
openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session);
closeHandle = JettyWebSocketFrameHandlerFactory.bindTo(closeHandle, session);
errorHandle = JettyWebSocketFrameHandlerFactory.bindTo(errorHandle, session);
textHandle = JettyWebSocketFrameHandlerFactory.bindTo(textHandle, session);
binaryHandle = JettyWebSocketFrameHandlerFactory.bindTo(binaryHandle, session);
pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session);
pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session);
if (textHandle != null)
textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize());
if (binaryHandle != null)
binarySink = JettyWebSocketFrameHandlerFactory
.createMessageSink(binaryHandle, binarySinkClass, executor, coreSession.getMaxBinaryMessageSize());
if (openHandle != null)
openHandle.invoke();
futureSession.complete(session);
callback.succeeded();
} }
catch (Throwable t) catch (Throwable cause)
{ {
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t); // TODO should futureSession be failed here?
wsError.addSuppressed(cause); callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause));
throw wsError;
} }
} }
public static Throwable convertCause(Throwable cause)
{
if (cause instanceof MessageTooLargeException)
return new org.eclipse.jetty.websocket.api.MessageTooLargeException(cause.getMessage(), cause);
if (cause instanceof ProtocolException)
return new org.eclipse.jetty.websocket.api.ProtocolException(cause.getMessage(), cause);
if (cause instanceof BadPayloadException)
return new org.eclipse.jetty.websocket.api.BadPayloadException(cause.getMessage(), cause);
if (cause instanceof CloseException)
return new org.eclipse.jetty.websocket.api.CloseException(((CloseException)cause).getStatusCode(), cause.getMessage(), cause);
if (cause instanceof WebSocketTimeoutException)
return new org.eclipse.jetty.websocket.api.WebSocketTimeoutException(cause.getMessage(), cause);
if (cause instanceof InvalidSignatureException)
return new org.eclipse.jetty.websocket.api.InvalidWebSocketException(cause.getMessage(), cause);
if (cause instanceof UpgradeException)
return new org.eclipse.jetty.websocket.api.UpgradeException(((UpgradeException)cause).getRequestURI(), cause);
return cause;
}
/** /**
* @see #onFrame(Frame,Callback) * @see #onFrame(Frame,Callback)
*/ */
@ -185,61 +175,56 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
switch (frame.getOpCode()) switch (frame.getOpCode())
{ {
case OpCode.CLOSE: case OpCode.CLOSE:
onClose(frame, callback); onCloseFrame(frame, callback);
break; break;
case OpCode.PING: case OpCode.PING:
onPing(frame, callback); onPingFrame(frame, callback);
break; break;
case OpCode.PONG: case OpCode.PONG:
onPong(frame, callback); onPongFrame(frame, callback);
break; break;
case OpCode.TEXT: case OpCode.TEXT:
onText(frame, callback); onTextFrame(frame, callback);
break; break;
case OpCode.BINARY: case OpCode.BINARY:
onBinary(frame, callback); onBinaryFrame(frame, callback);
break; break;
case OpCode.CONTINUATION: case OpCode.CONTINUATION:
onContinuation(frame, callback); onContinuationFrame(frame, callback);
break; break;
} }
} }
@Override @Override
public void onOpen(CoreSession coreSession) public void onError(Throwable cause, Callback callback)
{ {
customizer.customize(coreSession); try
session = new WebSocketSessionImpl(coreSession, this, upgradeRequest, upgradeResponse);
frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session);
openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session);
closeHandle = JettyWebSocketFrameHandlerFactory.bindTo(closeHandle, session);
errorHandle = JettyWebSocketFrameHandlerFactory.bindTo(errorHandle, session);
textHandle = JettyWebSocketFrameHandlerFactory.bindTo(textHandle, session);
binaryHandle = JettyWebSocketFrameHandlerFactory.bindTo(binaryHandle, session);
pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session);
pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session);
if (textHandle != null)
textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize());
if (binaryHandle != null)
binarySink = JettyWebSocketFrameHandlerFactory.createMessageSink(binaryHandle, binarySinkClass, executor, coreSession.getMaxBinaryMessageSize());
if (openHandle != null)
{ {
try cause = convertCause(cause);
{ futureSession.completeExceptionally(cause);
openHandle.invoke();
}
catch (Throwable cause)
{
throw new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
}
}
futureSession.complete(session); if (errorHandle != null)
errorHandle.invoke(cause);
else
{
log.warn("Unhandled Error: Endpoint " + endpointInstance.getClass().getName() + " : " + cause);
if (log.isDebugEnabled())
log.debug("unhandled", cause);
}
callback.succeeded();
}
catch (Throwable t)
{
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
callback.failed(wsError);
}
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
callback.succeeded();
} }
public String toString() public String toString()
@ -259,7 +244,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
activeMessageSink = null; activeMessageSink = null;
} }
private void onBinary(Frame frame, Callback callback) private void onBinaryFrame(Frame frame, Callback callback)
{ {
if (activeMessageSink == null) if (activeMessageSink == null)
activeMessageSink = binarySink; activeMessageSink = binarySink;
@ -267,7 +252,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
acceptMessage(frame, callback); acceptMessage(frame, callback);
} }
private void onClose(Frame frame, Callback callback) private void onCloseFrame(Frame frame, Callback callback)
{ {
if (closeHandle != null) if (closeHandle != null)
{ {
@ -284,12 +269,12 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
callback.succeeded(); callback.succeeded();
} }
private void onContinuation(Frame frame, Callback callback) private void onContinuationFrame(Frame frame, Callback callback)
{ {
acceptMessage(frame, callback); acceptMessage(frame, callback);
} }
private void onPing(Frame frame, Callback callback) private void onPingFrame(Frame frame, Callback callback)
{ {
if (pingHandle != null) if (pingHandle != null)
{ {
@ -317,7 +302,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
callback.succeeded(); callback.succeeded();
} }
private void onPong(Frame frame, Callback callback) private void onPongFrame(Frame frame, Callback callback)
{ {
if (pongHandle != null) if (pongHandle != null)
{ {
@ -337,11 +322,39 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
callback.succeeded(); callback.succeeded();
} }
private void onText(Frame frame, Callback callback) private void onTextFrame(Frame frame, Callback callback)
{ {
if (activeMessageSink == null) if (activeMessageSink == null)
activeMessageSink = textSink; activeMessageSink = textSink;
acceptMessage(frame, callback); acceptMessage(frame, callback);
} }
static Throwable convertCause(Throwable cause)
{
if (cause instanceof MessageTooLargeException)
return new org.eclipse.jetty.websocket.api.MessageTooLargeException(cause.getMessage(), cause);
if (cause instanceof ProtocolException)
return new org.eclipse.jetty.websocket.api.ProtocolException(cause.getMessage(), cause);
if (cause instanceof BadPayloadException)
return new org.eclipse.jetty.websocket.api.BadPayloadException(cause.getMessage(), cause);
if (cause instanceof CloseException)
return new org.eclipse.jetty.websocket.api.CloseException(((CloseException)cause).getStatusCode(), cause.getMessage(), cause);
if (cause instanceof WebSocketTimeoutException)
return new org.eclipse.jetty.websocket.api.WebSocketTimeoutException(cause.getMessage(), cause);
if (cause instanceof InvalidSignatureException)
return new org.eclipse.jetty.websocket.api.InvalidWebSocketException(cause.getMessage(), cause);
if (cause instanceof UpgradeException)
return new org.eclipse.jetty.websocket.api.UpgradeException(((UpgradeException)cause).getRequestURI(), cause);
return cause;
}
} }

View File

@ -119,7 +119,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events // Trigger Events
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello?").setFin(true), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello?").setFin(true), Callback.NOOP);
localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP); localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP);
@ -163,7 +163,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events // Trigger Events
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello Text Stream").setFin(true), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello Text Stream").setFin(true), Callback.NOOP);
localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP); localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP);
@ -185,7 +185,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events // Trigger Events
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hel").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hel").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("lo ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("lo ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("Wor").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("Wor").setFin(false), Callback.NOOP);
@ -208,7 +208,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events // Trigger Events
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP);
@ -238,7 +238,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events // Trigger Events
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP);
@ -264,10 +264,10 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events // Trigger Events
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onError(new RuntimeException("Nothing to see here")); localEndpoint.onError(new RuntimeException("Nothing to see here"), Callback.NOOP);
// Validate Events // Validate Events
socket.events.assertEvents( socket.events.assertEvents(
@ -284,7 +284,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events // Trigger Events
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP);
@ -314,7 +314,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events // Trigger Events
localEndpoint.onOpen(channel); localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.PING).setPayload("You there?"), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.PING).setPayload("You there?"), Callback.NOOP);

View File

@ -18,11 +18,6 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.client.UpgradeRequest;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.URI; import java.net.URI;
@ -30,6 +25,11 @@ import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.client.UpgradeRequest;
import org.eclipse.jetty.websocket.core.server.Negotiation;
/** /**
* Interface for local WebSocket Endpoint Frame handling. * Interface for local WebSocket Endpoint Frame handling.
* *
@ -124,74 +124,6 @@ public interface FrameHandler extends IncomingFrames
return false; return false;
} }
interface Adaptor extends FrameHandler
{
@Override
default void onOpen(CoreSession coreSession, Callback callback)
{
try
{
onOpen(coreSession);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onOpen(CoreSession coreSession) throws Exception {}
@Override
default void onFrame(Frame frame, Callback callback)
{
try
{
onFrame(frame);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onFrame(Frame frame) throws Exception {}
@Override
default void onClosed(CloseStatus closeStatus, Callback callback)
{
try
{
onClosed(closeStatus);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onClosed(CloseStatus closeStatus) throws Exception {}
@Override
default void onError(Throwable cause, Callback callback)
{
try
{
onError(cause);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onError(Throwable cause) throws Exception {}
}
interface Configuration interface Configuration
{ {
@ -568,7 +500,7 @@ public interface FrameHandler extends IncomingFrames
@Override @Override
public Duration getIdleTimeout() public Duration getIdleTimeout()
{ {
return timeout==null ? Duration.ofSeconds(0) : timeout; return timeout==null ? Duration.ZERO : timeout;
} }
@Override @Override

View File

@ -18,14 +18,18 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.*;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/** /**
* A utility implementation of FrameHandler that defragments * A utility implementation of FrameHandler that defragments
* text frames into a String message before calling {@link #onText(String, Callback)}. * text frames into a String message before calling {@link #onText(String, Callback)}.
@ -33,7 +37,7 @@ import java.util.function.Consumer;
* may extend {@link #isDemanding()} to return true and then explicityly control * may extend {@link #isDemanding()} to return true and then explicityly control
* demand with calls to {@link org.eclipse.jetty.websocket.core.FrameHandler.CoreSession#demand(long)} * demand with calls to {@link org.eclipse.jetty.websocket.core.FrameHandler.CoreSession#demand(long)}
*/ */
public class MessageHandler implements FrameHandler.Adaptor public class MessageHandler implements FrameHandler
{ {
public static MessageHandler from(Consumer<String> onText, Consumer<ByteBuffer> onBinary) public static MessageHandler from(Consumer<String> onText, Consumer<ByteBuffer> onBinary)
@ -124,17 +128,18 @@ public class MessageHandler implements FrameHandler.Adaptor
this.maxBinaryMessageSize = maxBinaryMessageSize; this.maxBinaryMessageSize = maxBinaryMessageSize;
} }
@Override
public void onOpen(CoreSession coreSession) throws Exception
{
this.coreSession = coreSession;
}
public CoreSession getCoreSession() public CoreSession getCoreSession()
{ {
return coreSession; return coreSession;
} }
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
this.coreSession = coreSession;
callback.succeeded();
}
@Override @Override
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {
@ -226,7 +231,38 @@ public class MessageHandler implements FrameHandler.Adaptor
} }
@Override @Override
public final void onFrame(Frame frame){} public void onError(Throwable cause, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug(this + " onError ", cause);
callback.succeeded();
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onClosed {}", this, closeStatus);
if (utf8StringBuilder != null && utf8StringBuilder.length() > 0 && closeStatus.isNormal())
LOG.warn("{} closed with partial message: {} chars", utf8StringBuilder.length());
if (binaryMessage != null)
{
if (BufferUtil.hasContent(binaryMessage))
LOG.warn("{} closed with partial message: {} bytes", binaryMessage.remaining());
getCoreSession().getByteBufferPool().release(binaryMessage);
binaryMessage = null;
}
if (utf8StringBuilder != null)
{
utf8StringBuilder.reset();
utf8StringBuilder = null;
}
coreSession = null;
callback.succeeded();
}
private void onTextFrame(Frame frame, Callback callback) private void onTextFrame(Frame frame, Callback callback)
{ {
@ -422,36 +458,4 @@ public class MessageHandler implements FrameHandler.Adaptor
} }
}.iterate(); }.iterate();
} }
@Override
public void onClosed(CloseStatus closeStatus) throws Exception
{
if (LOG.isDebugEnabled())
LOG.debug("{} onClosed {}", this, closeStatus);
if (utf8StringBuilder != null && utf8StringBuilder.length() > 0 && closeStatus.isNormal())
LOG.warn("{} closed with partial message: {} chars", utf8StringBuilder.length());
if (binaryMessage != null)
{
if (BufferUtil.hasContent(binaryMessage))
LOG.warn("{} closed with partial message: {} bytes", binaryMessage.remaining());
getCoreSession().getByteBufferPool().release(binaryMessage);
binaryMessage = null;
}
if (utf8StringBuilder != null)
{
utf8StringBuilder.reset();
utf8StringBuilder = null;
}
coreSession = null;
}
@Override
public void onError(Throwable cause) throws Exception
{
if (LOG.isDebugEnabled())
LOG.debug(this + " onError ", cause);
}
} }

View File

@ -18,16 +18,6 @@
package org.eclipse.jetty.websocket.core.internal; package org.eclipse.jetty.websocket.core.internal;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.*;
import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -40,6 +30,28 @@ import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.CloseException;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
import static org.eclipse.jetty.util.Callback.NOOP; import static org.eclipse.jetty.util.Callback.NOOP;
/** /**
@ -469,9 +481,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override @Override
public void sendFrame(Frame frame, Callback callback, boolean batch) public void sendFrame(Frame frame, Callback callback, boolean batch)
{ {
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
try try
{ {
assertValidOutgoing(frame); assertValidOutgoing(frame);
@ -487,26 +496,29 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
synchronized(flusher) synchronized(flusher)
{ {
boolean closeConnection = channelState.onOutgoingFrame(frame); boolean closeConnection = channelState.onOutgoingFrame(frame);
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {}) {}", frame, callback, batch, closeConnection);
if (frame.getOpCode() == OpCode.CLOSE) if (closeConnection)
{ {
if (LOG.isDebugEnabled()) Throwable cause = AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame));
LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch);
if (closeConnection) Callback closeConnectionCallback = Callback.from(
{ ()->closeConnection(cause, channelState.getCloseStatus(), callback),
callback = new Callback.Nested(callback) x->closeConnection(cause, channelState.getCloseStatus(), Callback.from(
{ ()-> callback.failed(x),
@Override x2->
public void completed() {
{ x.addSuppressed(x2);
closeConnection(AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame)), channelState.getCloseStatus(),NOOP); callback.failed(x);
} })));
};
} flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false));
}
else
{
flusher.queue.offer(new FrameEntry(frame, callback, batch));
} }
flusher.queue.offer(new FrameEntry(frame, callback, batch));
} }
flusher.iterate(); flusher.iterate();
} }

View File

@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import java.nio.ByteBuffer;
import static org.eclipse.jetty.websocket.core.OpCode.PONG; import static org.eclipse.jetty.websocket.core.OpCode.PONG;
/** /**
@ -35,7 +35,7 @@ import static org.eclipse.jetty.websocket.core.OpCode.PONG;
* NOTE: The introduction of WebSocket over HTTP/2 might change the behavior and implementation some. * NOTE: The introduction of WebSocket over HTTP/2 might change the behavior and implementation some.
* </p> * </p>
*/ */
public class AbstractTestFrameHandler implements FrameHandler.Adaptor public class AbstractTestFrameHandler implements SynchronousFrameHandler
{ {
private Logger LOG = Log.getLogger(AbstractTestFrameHandler.class); private Logger LOG = Log.getLogger(AbstractTestFrameHandler.class);
private byte partial = OpCode.UNDEFINED; private byte partial = OpCode.UNDEFINED;

View File

@ -111,7 +111,7 @@ public class MessageHandlerTest
} }
}; };
handler.onOpen(session); handler.onOpen(session, NOOP);
} }
@Test @Test
@ -350,7 +350,7 @@ public class MessageHandlerTest
FutureCallback callback; FutureCallback callback;
handler.setMaxTextMessageSize(4); handler.setMaxTextMessageSize(4);
handler.onOpen(session); handler.onOpen(session, NOOP);
callback = new FutureCallback(); callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, true, "Testing"), callback); handler.onFrame(new Frame(OpCode.TEXT, true, "Testing"), callback);
@ -369,7 +369,7 @@ public class MessageHandlerTest
FutureCallback callback; FutureCallback callback;
handler.setMaxTextMessageSize(4); handler.setMaxTextMessageSize(4);
handler.onOpen(session); handler.onOpen(session, NOOP);
callback = new FutureCallback(); callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, false, "123"), callback); handler.onFrame(new Frame(OpCode.TEXT, false, "123"), callback);
@ -570,7 +570,7 @@ public class MessageHandlerTest
FutureCallback callback; FutureCallback callback;
handler.setMaxBinaryMessageSize(4); handler.setMaxBinaryMessageSize(4);
handler.onOpen(session); handler.onOpen(session, NOOP);
callback = new FutureCallback(); callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.BINARY, true, "Testing"), callback); handler.onFrame(new Frame(OpCode.BINARY, true, "Testing"), callback);
@ -589,7 +589,7 @@ public class MessageHandlerTest
FutureCallback callback; FutureCallback callback;
handler.setMaxBinaryMessageSize(4); handler.setMaxBinaryMessageSize(4);
handler.onOpen(session); handler.onOpen(session, NOOP);
callback = new FutureCallback(); callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.BINARY, false, "123"), callback); handler.onFrame(new Frame(OpCode.BINARY, false, "123"), callback);
@ -653,7 +653,7 @@ public class MessageHandlerTest
} }
}; };
handler.onOpen(session); handler.onOpen(session, NOOP);
FutureCallback callback; FutureCallback callback;
@ -681,7 +681,7 @@ public class MessageHandlerTest
} }
}; };
handler.onOpen(session); handler.onOpen(session, NOOP);
FutureCallback callback; FutureCallback callback;

View File

@ -0,0 +1,87 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.Callback;
public interface SynchronousFrameHandler extends FrameHandler
{
@Override
default void onOpen(CoreSession coreSession, Callback callback)
{
try
{
onOpen(coreSession);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onOpen(CoreSession coreSession) throws Exception {}
@Override
default void onFrame(Frame frame, Callback callback)
{
try
{
onFrame(frame);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onFrame(Frame frame) throws Exception {}
@Override
default void onClosed(CloseStatus closeStatus, Callback callback)
{
try
{
onClosed(closeStatus);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onClosed(CloseStatus closeStatus) throws Exception {}
@Override
default void onError(Throwable cause, Callback callback)
{
try
{
onError(cause);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onError(Throwable cause) throws Exception {}
}

View File

@ -18,18 +18,18 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import java.util.concurrent.BlockingQueue; public class TestFrameHandler implements SynchronousFrameHandler
import java.util.concurrent.CountDownLatch;
public class TestFrameHandler implements FrameHandler.Adaptor
{ {
private static Logger LOG = Log.getLogger(TestFrameHandler.class); private static Logger LOG = Log.getLogger(SynchronousFrameHandler.class);
private CoreSession session; private CoreSession session;

View File

@ -18,6 +18,11 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -39,16 +44,14 @@ import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.eclipse.jetty.util.Callback.NOOP; import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
* Tests of a core server with a fake client * Tests of a core server with a fake client
@ -352,7 +355,7 @@ public class WebSocketCloseTest extends WebSocketTester
assertThat(server.handler.closeStatus.getReason(), containsString("onReceiveFrame throws for binary frames")); assertThat(server.handler.closeStatus.getReason(), containsString("onReceiveFrame throws for binary frames"));
} }
static class TestFrameHandler implements FrameHandler.Adaptor static class TestFrameHandler implements SynchronousFrameHandler
{ {
private CoreSession session; private CoreSession session;
String state; String state;

View File

@ -18,6 +18,12 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -39,16 +45,14 @@ import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import static org.eclipse.jetty.util.Callback.NOOP; import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.*; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
* Tests of a core server with a fake client * Tests of a core server with a fake client
@ -83,7 +87,7 @@ public class WebSocketOpenTest extends WebSocketTester
setup((s,c)-> setup((s,c)->
{ {
assertThat(s.toString(),containsString("CONNECTED")); assertThat(s.toString(),containsString("CONNECTED"));
TestFrameHandler.sendText(s,"Hello", c); WebSocketOpenTest.TestFrameHandler.sendText(s,"Hello", c);
s.demand(1); s.demand(1);
return null; return null;
}); });
@ -170,7 +174,7 @@ public class WebSocketOpenTest extends WebSocketTester
Thread.sleep(100); Thread.sleep(100);
// Can send while onOpen is active // Can send while onOpen is active
TestFrameHandler.sendText(session,"Hello", NOOP); WebSocketOpenTest.TestFrameHandler.sendText(session,"Hello", NOOP);
Parser.ParsedFrame frame = receiveFrame(client.getInputStream()); Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getPayloadAsUTF8(),is("Hello")); assertThat(frame.getPayloadAsUTF8(),is("Hello"));
@ -203,7 +207,7 @@ public class WebSocketOpenTest extends WebSocketTester
static class TestFrameHandler implements FrameHandler.Adaptor static class TestFrameHandler implements SynchronousFrameHandler
{ {
private CoreSession session; private CoreSession session;
private BiFunction<CoreSession,Callback,Void> onOpen; private BiFunction<CoreSession,Callback,Void> onOpen;
@ -351,7 +355,7 @@ public class WebSocketOpenTest extends WebSocketTester
public void sendText(String text) public void sendText(String text)
{ {
LOG.info("sending {}...", text); LOG.info("sending {}...", text);
TestFrameHandler.sendText(handler.session, text); WebSocketOpenTest.TestFrameHandler.sendText(handler.session, text);
} }
public void close() public void close()

View File

@ -18,6 +18,15 @@
package org.eclipse.jetty.websocket.core.chat; package org.eclipse.jetty.websocket.core.chat;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -34,14 +43,6 @@ import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler; import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.eclipse.jetty.util.Callback.NOOP; import static org.eclipse.jetty.util.Callback.NOOP;
public class ChatWebSocketServer public class ChatWebSocketServer
@ -67,14 +68,12 @@ public class ChatWebSocketServer
// + MUST return the FrameHandler or null or exception? // + MUST return the FrameHandler or null or exception?
return new MessageHandler() return new MessageHandler()
{ {
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onOpen(CoreSession coreSession, Callback callback)
{ {
LOG.debug("onOpen {}", coreSession); LOG.debug("onOpen {}", coreSession);
setMaxTextMessageSize(2 * 1024); setMaxTextMessageSize(2 * 1024);
super.onOpen(coreSession); super.onOpen(coreSession, Callback.from(()->{members.add(this); callback.succeeded();},x->callback.failed(x)));
members.add(this);
} }
@Override @Override
@ -92,10 +91,10 @@ public class ChatWebSocketServer
} }
@Override @Override
public void onClosed(CloseStatus closeStatus) throws Exception public void onClosed(CloseStatus closeStatus, Callback callback)
{ {
LOG.debug("onClosed {}", closeStatus); LOG.debug("onClosed {}", closeStatus);
super.onClosed(closeStatus); super.onClosed(closeStatus, Callback.from(()->members.remove(this),callback));
members.remove(this); members.remove(this);
} }
}; };