Issue #3290 async onOpen, onClose and onError

Gave onOpen, onError and onClose callback signatures
Illegal to ask for demand prior to onOpen success
added tests for various onOpen scenarios

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-01-26 15:34:50 +11:00
parent 330a3e7f58
commit 8b93922d08
25 changed files with 612 additions and 456 deletions

View File

@ -18,24 +18,21 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.*;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.client.util.PathContentProvider;
import org.eclipse.jetty.http.*;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.HttpCookie; import java.net.HttpCookie;
import java.net.URI; import java.net.*;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.UnsupportedCharsetException; import java.nio.charset.UnsupportedCharsetException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -43,21 +40,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.client.util.PathContentProvider;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
public class HttpRequest implements Request public class HttpRequest implements Request
{ {
private static final URI NULL_URI = URI.create("null:0"); private static final URI NULL_URI = URI.create("null:0");
@ -876,6 +858,6 @@ public class HttpRequest implements Request
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s[%s %s %s]@%x", HttpRequest.class.getSimpleName(), getMethod(), getPath(), getVersion(), hashCode()); return String.format("%s[%s %s %s]@%x", this.getClass().getSimpleName(), getMethod(), getPath(), getVersion(), hashCode());
} }
} }

View File

@ -18,6 +18,20 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import org.eclipse.jetty.http.*;
import org.eclipse.jetty.io.*;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -32,33 +46,6 @@ import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
/** /**
* HttpChannel represents a single endpoint for HTTP semantic processing. * HttpChannel represents a single endpoint for HTTP semantic processing.
* The HttpChannel is both a HttpParser.RequestHandler, where it passively receives events from * The HttpChannel is both a HttpParser.RequestHandler, where it passively receives events from
@ -574,6 +561,8 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
*/ */
protected void handleException(Throwable failure) protected void handleException(Throwable failure)
{ {
LOG.warn(failure);
// Unwrap wrapping Jetty and Servlet exceptions. // Unwrap wrapping Jetty and Servlet exceptions.
Throwable quiet = unwrap(failure, QuietException.class); Throwable quiet = unwrap(failure, QuietException.class);
Throwable no_stack = unwrap(failure, BadMessageException.class, IOException.class, TimeoutException.class); Throwable no_stack = unwrap(failure, BadMessageException.class, IOException.class, TimeoutException.class);

View File

@ -56,8 +56,7 @@ public class ClientUpgradeRequestImpl extends org.eclipse.jetty.websocket.core.c
UpgradeRequest upgradeRequest = new DelegatedClientUpgradeRequest(this); UpgradeRequest upgradeRequest = new DelegatedClientUpgradeRequest(this);
UpgradeResponse upgradeResponse = new DelegatedClientUpgradeResponse(response); UpgradeResponse upgradeResponse = new DelegatedClientUpgradeResponse(response);
JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureJavaxSession);
upgradeRequest, upgradeResponse, futureJavaxSession);
return frameHandler; return frameHandler;
} }

View File

@ -18,33 +18,23 @@
package org.eclipse.jetty.websocket.javax.client; package org.eclipse.jetty.websocket.javax.client;
import java.io.IOException;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.DeploymentException;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Extension;
import javax.websocket.Session;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory; import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry; import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint; import org.eclipse.jetty.websocket.javax.common.*;
import org.eclipse.jetty.websocket.javax.common.InvalidWebSocketException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer; import javax.websocket.*;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketExtensionConfig; import java.io.IOException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory; import java.net.URI;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/** /**
* Container for Client use of the javax.websocket API. * Container for Client use of the javax.websocket API.
@ -171,8 +161,11 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple
try try
{ {
Future<Session> sessionFuture = connect(upgradeRequest); Future<Session> sessionFuture = connect(upgradeRequest);
// TODO: apply connect timeouts here? long timeout = getDefaultMaxSessionIdleTimeout();
return sessionFuture.get(); // TODO: unwrap IOException from ExecutionException? if (timeout>0)
return sessionFuture.get(timeout, TimeUnit.MILLISECONDS);
return sessionFuture.get();
} }
catch (Exception e) catch (Exception e)
{ {

View File

@ -18,47 +18,26 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.websocket.CloseReason;
import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.PongMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.*;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders; import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryMessageSink; import org.eclipse.jetty.websocket.javax.common.messages.*;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryStreamMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedTextMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedTextStreamMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.PartialByteArrayMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.PartialByteBufferMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.PartialStringMessageSink;
import org.eclipse.jetty.websocket.javax.common.util.InvokerUtils; import org.eclipse.jetty.websocket.javax.common.util.InvokerUtils;
public class JavaxWebSocketFrameHandler implements FrameHandler import javax.websocket.MessageHandler;
import javax.websocket.*;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class JavaxWebSocketFrameHandler implements FrameHandler.Adaptor
{ {
private final Logger LOG; private final Logger LOG;
private final JavaxWebSocketContainer container; private final JavaxWebSocketContainer container;
@ -275,7 +254,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
} }
catch (Throwable cause) catch (Throwable cause)
{ {
throw new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause); Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
// TODO This feels like double handling of the exception? Review need for futureSession
futureSession.completeExceptionally(wse);
throw wse;
} }
} }
@ -283,6 +266,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
futureSession.complete(session); futureSession.complete(session);
} }
/**
* @see #onFrame(Frame,Callback)
*/
public final void onFrame(Frame frame) {}
@Override @Override
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {

View File

@ -18,20 +18,6 @@
package org.eclipse.jetty.websocket.javax.server; package org.eclipse.jetty.websocket.javax.server;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import javax.websocket.EndpointConfig;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec; import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
@ -52,6 +38,19 @@ import org.eclipse.jetty.websocket.javax.server.internal.JavaxWebSocketCreator;
import org.eclipse.jetty.websocket.javax.server.internal.UndefinedServerEndpointConfig; import org.eclipse.jetty.websocket.javax.server.internal.UndefinedServerEndpointConfig;
import org.eclipse.jetty.websocket.servlet.WebSocketMapping; import org.eclipse.jetty.websocket.servlet.WebSocketMapping;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import javax.websocket.EndpointConfig;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
@ManagedObject("JSR356 Server Container") @ManagedObject("JSR356 Server Container")
public class JavaxWebSocketServerContainer public class JavaxWebSocketServerContainer
extends JavaxWebSocketClientContainer extends JavaxWebSocketClientContainer
@ -310,9 +309,10 @@ public class JavaxWebSocketServerContainer
@Override @Override
public int getDefaultMaxBinaryMessageBufferSize() public int getDefaultMaxBinaryMessageBufferSize()
{ {
// TODO: warn on long -> int conversion issue long max = customizer.getMaxBinaryMessageSize();
// TODO: Should this be Filter? if (max > (long)Integer.MAX_VALUE)
return (int)customizer.getMaxBinaryMessageSize(); return Integer.MAX_VALUE;
return (int)max;
} }
@Override @Override
@ -324,8 +324,10 @@ public class JavaxWebSocketServerContainer
@Override @Override
public int getDefaultMaxTextMessageBufferSize() public int getDefaultMaxTextMessageBufferSize()
{ {
// TODO: warn on long -> int conversion issue long max = customizer.getMaxTextMessageSize();
return (int)customizer.getMaxTextMessageSize(); if (max > (long)Integer.MAX_VALUE)
return Integer.MAX_VALUE;
return (int)max;
} }
@Override @Override

View File

@ -18,16 +18,6 @@
package org.eclipse.jetty.websocket.javax.tests; package org.eclipse.jetty.websocket.javax.tests;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpResponse; import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
@ -42,6 +32,16 @@ import org.eclipse.jetty.websocket.core.client.UpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.internal.Generator; import org.eclipse.jetty.websocket.core.internal.Generator;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable
{ {
private final LocalServer server; private final LocalServer server;
@ -217,7 +217,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
} }
} }
public static class FrameCapture implements FrameHandler public static class FrameCapture implements FrameHandler.Adaptor
{ {
private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>(); private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>();
private final EndPoint endPoint; private final EndPoint endPoint;

View File

@ -25,7 +25,7 @@ import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.FrameHandler;
public class FrameEcho implements FrameHandler public class FrameEcho implements FrameHandler.Adaptor
{ {
private Logger LOG = Log.getLogger(FrameEcho.class); private Logger LOG = Log.getLogger(FrameEcho.class);

View File

@ -52,7 +52,7 @@ public class FrameHandlerTracker extends MessageHandler
} }
@Override @Override
public void onClosed(CloseStatus closeStatus) public void onClosed(CloseStatus closeStatus) throws Exception
{ {
super.onClosed(closeStatus); super.onClosed(closeStatus);

View File

@ -18,13 +18,6 @@
package org.eclipse.jetty.websocket.javax.tests.client.misbehaving; package org.eclipse.jetty.websocket.javax.tests.client.misbehaving;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel; import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.javax.tests.CoreServer; import org.eclipse.jetty.websocket.javax.tests.CoreServer;
@ -32,6 +25,12 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -99,7 +98,7 @@ public class MisbehavingClassTest
Exception e = assertThrows(IOException.class, () -> container.connectToServer(socket, server.getWsUri()), "Should have failed .connectToServer()"); Exception e = assertThrows(IOException.class, () -> container.connectToServer(socket, server.getWsUri()), "Should have failed .connectToServer()");
assertThat(e.getCause(), instanceOf(ExecutionException.class)); assertThat(e.getCause(), instanceOf(ExecutionException.class));
assertThat("Close should have occurred", socket.closeLatch.await(1, TimeUnit.SECONDS), is(true)); assertThat("Close should have occurred", socket.closeLatch.await(5, TimeUnit.SECONDS), is(true));
Throwable cause = socket.errors.pop(); Throwable cause = socket.errors.pop();
assertThat("Error", cause, instanceOf(RuntimeException.class)); assertThat("Error", cause, instanceOf(RuntimeException.class));

View File

@ -26,24 +26,14 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.common.invoke.InvalidSignatureException; import org.eclipse.jetty.websocket.common.invoke.InvalidSignatureException;
import org.eclipse.jetty.websocket.core.BadPayloadException; import org.eclipse.jetty.websocket.core.*;
import org.eclipse.jetty.websocket.core.CloseException;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.UpgradeException;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
public class JettyWebSocketFrameHandler implements FrameHandler public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
{ {
private final Logger log; private final Logger log;
private final Executor executor; private final Executor executor;
@ -116,7 +106,6 @@ public class JettyWebSocketFrameHandler implements FrameHandler
@Override @Override
public void onClosed(CloseStatus closeStatus) public void onClosed(CloseStatus closeStatus)
{ {
// TODO: FrameHandler cleanup?
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
@ -172,6 +161,11 @@ public class JettyWebSocketFrameHandler implements FrameHandler
return cause; return cause;
} }
/**
* @see #onFrame(Frame,Callback)
*/
public final void onFrame(Frame frame) {}
@Override @Override
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {

View File

@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable; import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.Utf8StringBuilder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
/** /**
* Representation of a WebSocket Close (status code &amp; reason) * Representation of a WebSocket Close (status code &amp; reason)
*/ */
@ -355,5 +355,11 @@ public class CloseStatus
{ {
return CloseStatus.this; return CloseStatus.this;
} }
@Override
public String toString()
{
return super.toString() + ":" + CloseStatus.this.toString();
}
} }
} }

View File

@ -18,6 +18,11 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.client.UpgradeRequest;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.URI; import java.net.URI;
@ -25,11 +30,6 @@ import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.client.UpgradeRequest;
import org.eclipse.jetty.websocket.core.server.Negotiation;
/** /**
* Interface for local WebSocket Endpoint Frame handling. * Interface for local WebSocket Endpoint Frame handling.
* *
@ -49,33 +49,31 @@ import org.eclipse.jetty.websocket.core.server.Negotiation;
* Once instantiated the FrameHandler follows is used as follows: * Once instantiated the FrameHandler follows is used as follows:
* </p> * </p>
* <ul> * <ul>
* <li>The {@link #onOpen(CoreSession)} method is called when negotiation of the connection is completed. The passed {@link CoreSession} instance is used * <li>The {@link #onOpen(CoreSession,Callback)} method is called when negotiation of the connection is completed. The passed {@link CoreSession} instance is used
* to obtain information about the connection and to send frames</li> * to obtain information about the connection and to send frames</li>
* <li>Every data and control frame received is passed to {@link #onFrame(Frame, Callback)}.</li> * <li>Every data and control frame received is passed to {@link #onFrame(Frame, Callback)}.</li>
* <li>Received Control Frames that require a response (eg Ping, Close) are first passed to the {@link #onFrame(Frame, Callback)} to give the * <li>Received Control Frames that require a response (eg Ping, Close) are first passed to the {@link #onFrame(Frame, Callback)} to give the
* Application an opportunity to send the response itself. If an appropriate response has not been sent when the callback passed is completed, then a * Application an opportunity to send the response itself. If an appropriate response has not been sent when the callback passed is completed, then a
* response will be generated.</li> * response will be generated.</li>
* <li>If an error is detected or received, then {@link #onError(Throwable)} will be called to inform the application of the cause of the problem. * <li>If an error is detected or received, then {@link #onError(Throwable,Callback)} will be called to inform the application of the cause of the problem.
* The connection will then be closed or aborted and the {@link #onClosed(CloseStatus)} method called.</li> * The connection will then be closed or aborted and the {@link #onClosed(CloseStatus,Callback)} method called.</li>
* <li>The {@link #onClosed(CloseStatus)} method is always called once a websocket connection is terminated, either gracefully or not. The error code * <li>The {@link #onClosed(CloseStatus,Callback)} method is always called once a websocket connection is terminated, either gracefully or not. The error code
* will indicate the nature of the close.</li> * will indicate the nature of the close.</li>
* </ul> * </ul>
*/ */
public interface FrameHandler extends IncomingFrames public interface FrameHandler extends IncomingFrames
{ {
// TODO: have conversation about "throws Exception" vs "throws WebSocketException" vs "throws Throwable" in below signatures.
/** /**
* Connection is being opened. * Async notification that Connection is being opened.
* <p> * <p>
* FrameHandler can write during this call, but will not receive frames until * FrameHandler can write during this call, but will not receive frames until
* the onOpen() completes. * the onOpen() completes.
* </p> * </p>
* *
* @param coreSession the channel associated with this connection. * @param coreSession the channel associated with this connection.
* @throws Exception if unable to open. TODO: will close the connection (optionally choosing close status code based on WebSocketException type)? * @param callback the callback to indicate success in processing (or failure)
*/ */
void onOpen(CoreSession coreSession) throws Exception; void onOpen(CoreSession coreSession, Callback callback);
/** /**
* Receiver of all Frames. * Receiver of all Frames.
@ -100,18 +98,19 @@ public interface FrameHandler extends IncomingFrames
* </p> * </p>
* *
* @param closeStatus the close status received from remote, or in the case of abnormal closure from local. * @param closeStatus the close status received from remote, or in the case of abnormal closure from local.
* @param callback the callback to indicate success in processing (or failure)
*/ */
void onClosed(CloseStatus closeStatus); void onClosed(CloseStatus closeStatus, Callback callback);
/** /**
* An error has occurred or been detected in websocket-core and being reported to FrameHandler. * An error has occurred or been detected in websocket-core and being reported to FrameHandler.
* A call to onError will be followed by a call to {@link #onClosed(CloseStatus)} giving the close status * A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status
* derived from the error. * derived from the error.
* *
* @param cause the reason for the error * @param cause the reason for the error
* @throws Exception if unable to process the error. * @param callback the callback to indicate success in processing (or failure)
*/ */
void onError(Throwable cause) throws Exception; void onError(Throwable cause, Callback callback);
/** /**
* Does the FrameHandler manage it's own demand? * Does the FrameHandler manage it's own demand?
@ -126,6 +125,73 @@ public interface FrameHandler extends IncomingFrames
} }
interface Adaptor extends FrameHandler
{
@Override
default void onOpen(CoreSession coreSession, Callback callback)
{
try
{
onOpen(coreSession);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onOpen(CoreSession coreSession) throws Exception {}
@Override
default void onFrame(Frame frame, Callback callback)
{
try
{
onFrame(frame);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onFrame(Frame frame) throws Exception {}
@Override
default void onClosed(CloseStatus closeStatus, Callback callback)
{
try
{
onClosed(closeStatus);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onClosed(CloseStatus closeStatus) throws Exception {}
@Override
default void onError(Throwable cause, Callback callback)
{
try
{
onError(cause);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onError(Throwable cause) throws Exception {}
}
interface Configuration interface Configuration
{ {
@ -168,6 +234,7 @@ public interface FrameHandler extends IncomingFrames
void setMaxTextMessageSize(long maxSize); void setMaxTextMessageSize(long maxSize);
} }
/** /**
* Represents the outgoing Frames. * Represents the outgoing Frames.
*/ */
@ -225,10 +292,10 @@ public interface FrameHandler extends IncomingFrames
* </p> * </p>
* <p> * <p>
* Once called, any read/write activity on the websocket from this point will be indeterminate. * Once called, any read/write activity on the websocket from this point will be indeterminate.
* This can result in the {@link #onError(Throwable)} event being called indicating any issue that arises. * This can result in the {@link #onError(Throwable,Callback)} event being called indicating any issue that arises.
* </p> * </p>
* <p> * <p>
* Once the underlying connection has been determined to be closed, the {@link #onClosed(CloseStatus)} event will be called. * Once the underlying connection has been determined to be closed, the {@link #onClosed(CloseStatus,Callback)} event will be called.
* </p> * </p>
*/ */
void abort(); void abort();
@ -501,7 +568,7 @@ public interface FrameHandler extends IncomingFrames
@Override @Override
public Duration getIdleTimeout() public Duration getIdleTimeout()
{ {
return timeout; return timeout==null ? Duration.ofSeconds(0) : timeout;
} }
@Override @Override

View File

@ -18,11 +18,7 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.*;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -37,7 +33,7 @@ import java.util.function.Consumer;
* may extend {@link #isDemanding()} to return true and then explicityly control * may extend {@link #isDemanding()} to return true and then explicityly control
* demand with calls to {@link org.eclipse.jetty.websocket.core.FrameHandler.CoreSession#demand(long)} * demand with calls to {@link org.eclipse.jetty.websocket.core.FrameHandler.CoreSession#demand(long)}
*/ */
public class MessageHandler implements FrameHandler public class MessageHandler implements FrameHandler.Adaptor
{ {
public static MessageHandler from(Consumer<String> onText, Consumer<ByteBuffer> onBinary) public static MessageHandler from(Consumer<String> onText, Consumer<ByteBuffer> onBinary)
@ -229,6 +225,9 @@ public class MessageHandler implements FrameHandler
} }
} }
@Override
public final void onFrame(Frame frame){}
private void onTextFrame(Frame frame, Callback callback) private void onTextFrame(Frame frame, Callback callback)
{ {
if (frame.hasPayload()) if (frame.hasPayload())
@ -298,8 +297,8 @@ public class MessageHandler implements FrameHandler
/** /**
* Method called when a complete text message is received. * Method called when a complete text message is received.
* *
* @param message * @param message the received text payload
* @param callback * @param callback The callback to signal completion of handling.
*/ */
protected void onText(String message, Callback callback) protected void onText(String message, Callback callback)
{ {
@ -309,8 +308,8 @@ public class MessageHandler implements FrameHandler
/** /**
* Method called when a complete binary message is received. * Method called when a complete binary message is received.
* *
* @param message * @param message The binary payload
* @param callback * @param callback The callback to signal completion of handling.
*/ */
protected void onBinary(ByteBuffer message, Callback callback) protected void onBinary(ByteBuffer message, Callback callback)
{ {
@ -425,7 +424,7 @@ public class MessageHandler implements FrameHandler
} }
@Override @Override
public void onClosed(CloseStatus closeStatus) public void onClosed(CloseStatus closeStatus) throws Exception
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} onClosed {}", this, closeStatus); LOG.debug("{} onClosed {}", this, closeStatus);

View File

@ -18,23 +18,13 @@
package org.eclipse.jetty.websocket.core.client; package org.eclipse.jetty.websocket.core.client;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.*;
import org.eclipse.jetty.client.HttpConversation;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionUpgrader; import org.eclipse.jetty.client.http.HttpConnectionUpgrader;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.*;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.B64Code;
@ -42,17 +32,8 @@ import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Behavior; import org.eclipse.jetty.websocket.core.*;
import org.eclipse.jetty.websocket.core.ExtensionConfig; import org.eclipse.jetty.websocket.core.internal.*;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.UpgradeException;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
@ -344,10 +325,15 @@ public abstract class UpgradeRequest extends HttpRequest implements Response.Com
notifyUpgradeListeners((listener) -> listener.onHandshakeResponse(this, response)); notifyUpgradeListeners((listener) -> listener.onHandshakeResponse(this, response));
// Now swap out the connection // Now swap out the connection
try
{
endp.upgrade(wsConnection); endp.upgrade(wsConnection);
}
finally
{
futureCoreSession.complete(wsChannel); futureCoreSession.complete(wsChannel);
} }
}
/** /**
* Allow for overridden customization of endpoint (such as special transport level properties: e.g. TCP keepAlive) * Allow for overridden customization of endpoint (such as special transport level properties: e.g. TCP keepAlive)

View File

@ -23,16 +23,10 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.*;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketException;
import java.io.Closeable; import java.io.Closeable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Supplier;
/** /**
* Parsing of a frames in WebSocket land. * Parsing of a frames in WebSocket land.
@ -415,5 +409,13 @@ public class Parser
{ {
return releaseable; return releaseable;
} }
@Override
public String toString()
{
if (closeStatus==null)
return super.toString();
return super.toString() + ":" + closeStatus;
}
} }
} }

View File

@ -18,6 +18,16 @@
package org.eclipse.jetty.websocket.core.internal; package org.eclipse.jetty.websocket.core.internal;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.*;
import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -30,27 +40,7 @@ import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool; import static org.eclipse.jetty.util.Callback.NOOP;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.CloseException;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
/** /**
* The Core WebSocket Session. * The Core WebSocket Session.
@ -262,7 +252,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override @Override
public void close(Callback callback) public void close(Callback callback)
{ {
close(NO_CODE, callback, false); close(NO_CODE, callback);
} }
/** /**
@ -275,12 +265,12 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override @Override
public void close(int statusCode, String reason, Callback callback) public void close(int statusCode, String reason, Callback callback)
{ {
close(new CloseStatus(statusCode, reason), callback, false); close(new CloseStatus(statusCode, reason), callback);
} }
private void close(CloseStatus closeStatus, Callback callback, boolean batch) private void close(CloseStatus closeStatus, Callback callback)
{ {
sendFrame(closeStatus.toFrame(), callback, batch); sendFrame(closeStatus.toFrame(), callback, false);
} }
@Override @Override
@ -293,39 +283,57 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{ {
CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString()); CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString());
if (channelState.onClosed(closeStatus)) if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus); closeConnection(cause, closeStatus, NOOP);
} }
public void closeConnection(Throwable cause, CloseStatus closeStatus) public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback)
{ {
connection.cancelDemand(); connection.cancelDemand();
if (connection.getEndPoint().isOpen())
connection.close();
// Forward Errors to Local WebSocket EndPoint // Forward Errors to Local WebSocket EndPoint
if (cause!=null) if (cause!=null)
{
Callback errorCallback = Callback.from(()->
{ {
try try
{ {
handler.onError(cause); handler.onClosed(closeStatus, callback);
}
catch (Throwable e)
{
LOG.warn(e);
callback.failed(e);
}
});
try
{
handler.onError(cause,errorCallback);
} }
catch (Throwable e) catch (Throwable e)
{ {
if (e != cause) if (e != cause)
cause.addSuppressed(e); cause.addSuppressed(e);
LOG.warn(cause); LOG.warn(cause);
errorCallback.failed(cause);
} }
} }
else
{
try try
{ {
handler.onClosed(closeStatus); handler.onClosed(closeStatus, callback);
} }
catch (Throwable e) catch (Throwable e)
{ {
LOG.warn(e); LOG.warn(e);
callback.failed(e);
}
} }
if (connection.getEndPoint().isOpen())
connection.close();
} }
AbnormalCloseStatus abnormalCloseStatusFor(Throwable cause) AbnormalCloseStatus abnormalCloseStatusFor(Throwable cause)
@ -344,7 +352,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
else else
code = CloseStatus.SERVER_ERROR; code = CloseStatus.SERVER_ERROR;
return new AbnormalCloseStatus(code, cause.getMessage()); return new AbnormalCloseStatus(code, cause);
} }
/** /**
@ -353,8 +361,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
* otherwise just close the connection. * otherwise just close the connection.
* *
* @param cause the cause * @param cause the cause
* @param callback the callback on completion of error handling
*/ */
public void processConnectionError(Throwable cause) public void processConnectionError(Throwable cause, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("processConnectionError {} {}", this, cause); LOG.debug("processConnectionError {} {}", this, cause);
@ -362,9 +371,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
CloseStatus closeStatus = abnormalCloseStatusFor(cause); CloseStatus closeStatus = abnormalCloseStatusFor(cause);
if (closeStatus.getCode() == CloseStatus.PROTOCOL) if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, Callback.NOOP, false); close(closeStatus, NOOP);
else if (channelState.onClosed(closeStatus)) else if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus); closeConnection(cause, closeStatus, callback);
} }
/** /**
@ -372,13 +381,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
* Send an abnormal close frame to ensure connection is closed. * Send an abnormal close frame to ensure connection is closed.
* *
* @param cause the cause * @param cause the cause
* @param callback the callback on completion of error handling
*/ */
public void processHandlerError(Throwable cause) public void processHandlerError(Throwable cause, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("processHandlerError {} {}", this, cause); LOG.debug("processHandlerError {} {}", this, cause);
close(abnormalCloseStatusFor(cause), Callback.NOOP, false); close(abnormalCloseStatusFor(cause), callback);
} }
/** /**
@ -389,43 +399,44 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onOpen() {}", this); LOG.debug("onOpen() {}", this);
try
{
// Upgrade success // Upgrade success
channelState.onConnected(); channelState.onConnected();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to CONNECTED"); LOG.debug("ConnectionState: Transition to CONNECTED");
// Open connection and handler Callback openCallback = Callback.from(()->
handler.onOpen(this); {
channelState.onOpen(); channelState.onOpen();
if (!demanding) if (!demanding)
connection.demand(1); connection.demand(1);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to OPEN"); LOG.debug("ConnectionState: Transition to OPEN");
},
x->
{
LOG.warn("Error during OPEN", x);
processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, x), NOOP);
});
try
{
// Open connection and handler
handler.onOpen(this, openCallback);
} }
catch (Throwable t) catch (Throwable t)
{ {
LOG.warn("Error during OPEN", t); openCallback.failed(t);
try
{
handler.onError(t);
}
catch (Exception e)
{
t.addSuppressed(e);
}
processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, t));
} }
} }
@Override @Override
public void demand(long n) public void demand(long n)
{ {
if (!demanding) if (!demanding)
throw new IllegalStateException(); throw new IllegalStateException("FrameHandler is not demanding: " + this);
if (!channelState.isInputOpen())
throw new IllegalStateException("FrameHandler input not open: " + this); // TODO Perhaps this is a NOOP?
connection.demand(n); connection.demand(n);
} }
@ -489,7 +500,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override @Override
public void completed() public void completed()
{ {
closeConnection(null, channelState.getCloseStatus()); closeConnection(AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame)), channelState.getCloseStatus(),NOOP);
} }
}; };
} }
@ -511,7 +522,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{ {
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus) if (closeStatus instanceof AbnormalCloseStatus)
closeConnection(null, closeStatus); closeConnection(null, closeStatus, NOOP);
} }
} }
} }
@ -609,7 +620,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
private class IncomingAdaptor implements IncomingFrames private class IncomingAdaptor implements IncomingFrames
{ {
@Override @Override
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, final Callback callback)
{ {
try try
{ {
@ -619,48 +630,41 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
boolean closeConnection = channelState.onIncomingFrame(frame); boolean closeConnection = channelState.onIncomingFrame(frame);
// Handle inbound close // Handle inbound frame
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() != OpCode.CLOSE)
{ {
connection.cancelDemand();
if (closeConnection)
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
handler.onClosed(channelState.getCloseStatus());
connection.close();
}
};
handler.onFrame(frame, callback); handler.onFrame(frame, callback);
return; return;
} }
callback = new Callback.Nested(callback) // Handle inbound CLOSE
connection.cancelDemand();
Callback closeCallback ;
if (closeConnection)
{ {
@Override closeCallback = Callback.from(()-> closeConnection(null, channelState.getCloseStatus(), callback));
public void completed() }
else
{
closeCallback = Callback.from(()->
{ {
if (channelState.isOutputOpen()) if (channelState.isOutputOpen())
{ {
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: sending close response {}", closeStatus); LOG.debug("ConnectionState: sending close response {}", closeStatus);
close(closeStatus==null ? CloseStatus.NO_CODE_STATUS : closeStatus, callback);
// this may race with a rare application close but errors are ignored
if (closeStatus==null)
closeStatus = CloseStatus.NO_CODE_STATUS;
close(closeStatus.getCode(), closeStatus.getReason(), Callback.NOOP);
} }
else
{
callback.succeeded();
} }
}; },
x->processHandlerError(x,callback));
} }
// Handle the frame handler.onFrame(frame, closeCallback);
handler.onFrame(frame, callback);
} }
catch (Throwable t) catch (Throwable t)
{ {
@ -753,9 +757,29 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
static class AbnormalCloseStatus extends CloseStatus static class AbnormalCloseStatus extends CloseStatus
{ {
public AbnormalCloseStatus(int statusCode, String reasonPhrase) final Throwable cause;
public AbnormalCloseStatus(int statusCode, Throwable cause)
{ {
super(statusCode, reasonPhrase); super(statusCode, cause.getMessage());
this.cause = cause;
}
public Throwable getCause()
{
return cause;
}
public static Throwable getCause(CloseStatus status)
{
if (status instanceof AbnormalCloseStatus)
return ((AbnormalCloseStatus)status).getCause();
return null;
}
@Override
public String toString()
{
return "Abnormal" + super.toString() + ":" + cause;
} }
} }

View File

@ -147,6 +147,7 @@ public class WebSocketChannelState
switch (_channelState) switch (_channelState)
{ {
case CONNECTED:
case OPEN: case OPEN:
_channelState = State.OSHUT; _channelState = State.OSHUT;
return false; return false;

View File

@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.core.internal; package org.eclipse.jetty.websocket.core.internal;
import org.eclipse.jetty.io.*;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.*;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -25,22 +33,6 @@ import java.util.Objects;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
/** /**
* Provides the implementation of {@link org.eclipse.jetty.io.Connection} that is suitable for WebSocket * Provides the implementation of {@link org.eclipse.jetty.io.Connection} that is suitable for WebSocket
*/ */
@ -185,7 +177,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
LOG.debug("onIdleExpired()"); LOG.debug("onIdleExpired()");
// treat as a handler error because socket is still open // treat as a handler error because socket is still open
channel.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout")); channel.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout"),Callback.NOOP);
return true; return true;
} }
@ -201,7 +193,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
LOG.debug("onReadTimeout()"); LOG.debug("onReadTimeout()");
// treat as a handler error because socket is still open // treat as a handler error because socket is still open
channel.processHandlerError(new WebSocketTimeoutException("Timeout on Read", timeout)); channel.processHandlerError(new WebSocketTimeoutException("Timeout on Read", timeout),Callback.NOOP);
return false; return false;
} }
@ -241,7 +233,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
referenced.release(); referenced.release();
// notify session & endpoint // notify session & endpoint
channel.processHandlerError(cause); channel.processHandlerError(cause,NOOP);
} }
}); });
} }
@ -453,7 +445,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
LOG.warn(t.toString()); LOG.warn(t.toString());
BufferUtil.clear(networkBuffer.getBuffer()); BufferUtil.clear(networkBuffer.getBuffer());
releaseNetworkBuffer(); releaseNetworkBuffer();
channel.processConnectionError(t); channel.processConnectionError(t,Callback.NOOP);
} }
} }
@ -494,8 +486,8 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
LOG.debug("onOpen() {}", this); LOG.debug("onOpen() {}", this);
// Open Channel // Open Channel
channel.onOpen();
super.onOpen(); super.onOpen();
channel.onOpen();
} }
@Override @Override
@ -615,7 +607,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
public void onCompleteFailure(Throwable x) public void onCompleteFailure(Throwable x)
{ {
super.onCompleteFailure(x); super.onCompleteFailure(x);
channel.processConnectionError(x); channel.processConnectionError(x,NOOP);
} }
} }
} }

View File

@ -35,7 +35,7 @@ import static org.eclipse.jetty.websocket.core.OpCode.PONG;
* NOTE: The introduction of WebSocket over HTTP/2 might change the behavior and implementation some. * NOTE: The introduction of WebSocket over HTTP/2 might change the behavior and implementation some.
* </p> * </p>
*/ */
public class AbstractTestFrameHandler implements FrameHandler public class AbstractTestFrameHandler implements FrameHandler.Adaptor
{ {
private Logger LOG = Log.getLogger(AbstractTestFrameHandler.class); private Logger LOG = Log.getLogger(AbstractTestFrameHandler.class);
private byte partial = OpCode.UNDEFINED; private byte partial = OpCode.UNDEFINED;

View File

@ -27,7 +27,7 @@ import org.eclipse.jetty.util.log.Logger;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
public class TestFrameHandler implements FrameHandler public class TestFrameHandler implements FrameHandler.Adaptor
{ {
private static Logger LOG = Log.getLogger(TestFrameHandler.class); private static Logger LOG = Log.getLogger(TestFrameHandler.class);
@ -47,7 +47,7 @@ public class TestFrameHandler implements FrameHandler
} }
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onOpen(CoreSession coreSession)
{ {
LOG.info("onOpen {}", coreSession); LOG.info("onOpen {}", coreSession);
this.session = coreSession; this.session = coreSession;
@ -69,7 +69,7 @@ public class TestFrameHandler implements FrameHandler
} }
@Override @Override
public void onError(Throwable cause) throws Exception public void onError(Throwable cause)
{ {
LOG.info("onError {} ", cause == null?null:cause.toString()); LOG.info("onError {} ", cause == null?null:cause.toString());
} }

View File

@ -18,11 +18,6 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -44,14 +39,16 @@ import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.eclipse.jetty.util.Callback.NOOP; import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
* Tests of a core server with a fake client * Tests of a core server with a fake client
@ -186,8 +183,6 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void serverFailClose_ISHUT() throws Exception public void serverFailClose_ISHUT() throws Exception
{
try (StacklessLogging stackless = new StacklessLogging(WebSocketChannel.class))
{ {
setup(State.ISHUT); setup(State.ISHUT);
server.handler.receivedCallback.poll().failed(new Exception("test failure")); server.handler.receivedCallback.poll().failed(new Exception("test failure"));
@ -199,7 +194,6 @@ public class WebSocketCloseTest extends WebSocketTester
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS)); assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
} }
}
@Test @Test
public void clientClose_OSHUT() throws Exception public void clientClose_OSHUT() throws Exception
@ -280,8 +274,6 @@ public class WebSocketCloseTest extends WebSocketTester
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false)); client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.handler.getCoreSession().demand(1);
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.close(); server.close();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
@ -321,8 +313,6 @@ public class WebSocketCloseTest extends WebSocketTester
client.close(); client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.handler.getCoreSession().demand(1);
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.close(); server.close();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
@ -362,7 +352,7 @@ public class WebSocketCloseTest extends WebSocketTester
assertThat(server.handler.closeStatus.getReason(), containsString("onReceiveFrame throws for binary frames")); assertThat(server.handler.closeStatus.getReason(), containsString("onReceiveFrame throws for binary frames"));
} }
static class TestFrameHandler implements FrameHandler static class TestFrameHandler implements FrameHandler.Adaptor
{ {
private CoreSession session; private CoreSession session;
String state; String state;
@ -414,7 +404,7 @@ public class WebSocketCloseTest extends WebSocketTester
} }
@Override @Override
public void onError(Throwable cause) throws Exception public void onError(Throwable cause)
{ {
LOG.info("onError {} ", cause == null?null:cause.toString()); LOG.info("onError {} ", cause == null?null:cause.toString());
state = session.toString(); state = session.toString();

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.util.function.Consumer;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -32,18 +29,26 @@ import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.internal.Parser; import org.eclipse.jetty.websocket.core.internal.Parser;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler; import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker; import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import static org.eclipse.jetty.util.Callback.NOOP; import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.*;
/** /**
* Tests of a core server with a fake client * Tests of a core server with a fake client
@ -53,6 +58,7 @@ public class WebSocketOpenTest extends WebSocketTester
private static Logger LOG = Log.getLogger(WebSocketOpenTest.class); private static Logger LOG = Log.getLogger(WebSocketOpenTest.class);
private WebSocketServer server; private WebSocketServer server;
private TestFrameHandler serverHandler;
private Socket client; private Socket client;
@AfterEach @AfterEach
@ -62,49 +68,174 @@ public class WebSocketOpenTest extends WebSocketTester
server.stop(); server.stop();
} }
public void setup(Consumer<FrameHandler.CoreSession> onOpen) throws Exception public void setup(BiFunction<FrameHandler.CoreSession,Callback,Void> onOpen) throws Exception
{ {
serverHandler = new TestFrameHandler(onOpen);
TestFrameHandler serverHandler = new TestFrameHandler(onOpen);
server = new WebSocketServer(0, serverHandler); server = new WebSocketServer(0, serverHandler);
server.start(); server.start();
client = newClient(server.getLocalPort()); client = newClient(server.getLocalPort());
assertThat(server.handler.getCoreSession().toString(), containsString("OPEN"));
LOG.info("Server: OPEN");
} }
@Test @Test
public void testSendFrameInOnOpen() throws Exception public void testSendFrameInOnOpen() throws Exception
{ {
setup(s->TestFrameHandler.sendText(s,"Hello")); setup((s,c)->
{
assertThat(s.toString(),containsString("CONNECTED"));
TestFrameHandler.sendText(s,"Hello", c);
s.demand(1);
return null;
});
Parser.ParsedFrame frame = receiveFrame(client.getInputStream()); Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getPayloadAsUTF8(),is("Hello")); assertThat(frame.getPayloadAsUTF8(),is("Hello"));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(),is(CloseStatus.NORMAL));
frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(),is(OpCode.CLOSE));
assertThat(new CloseStatus(frame).getCode(),is(CloseStatus.NORMAL));
}
@Test
public void testFailureInOnOpen() throws Exception
{
try(StacklessLogging stackless = new StacklessLogging(WebSocketChannel.class))
{
setup((s, c) ->
{
assertThat(s.toString(), containsString("CONNECTED"));
c.failed(new Exception("Test Exception in onOpen"));
return null;
});
assertTrue(server.handler.onError.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.error, notNullValue());
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(), is(OpCode.CLOSE));
assertThat(new CloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
}
} }
static class TestFrameHandler implements FrameHandler @Test
public void testCloseInOnOpen() throws Exception
{
setup((s,c)->
{
assertThat(s.toString(),containsString("CONNECTED"));
s.close(CloseStatus.SHUTDOWN,"Test close in onOpen", c);
s.demand(1);
return null;
});
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(),is(OpCode.CLOSE));
assertThat(new CloseStatus(frame).getCode(),is(CloseStatus.SHUTDOWN));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(),is(CloseStatus.NORMAL));
}
@Test
public void testAsyncOnOpen() throws Exception
{
Exchanger<FrameHandler.CoreSession> sx = new Exchanger<>();
Exchanger<Callback> cx = new Exchanger<>();
setup((s,c)->
{
assertThat(s.toString(),containsString("CONNECTED"));
try
{
sx.exchange(s);
cx.exchange(c);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
return null;
});
FrameHandler.CoreSession session = sx.exchange(null);
Callback onOpenCallback = cx.exchange(null);
Thread.sleep(100);
// Can send while onOpen is active
TestFrameHandler.sendText(session,"Hello", NOOP);
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getPayloadAsUTF8(),is("Hello"));
// But cannot receive
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertFalse(server.handler.onClosed.await(1, TimeUnit.SECONDS));
// Can't demand until open
assertThrows(Throwable.class, () -> session.demand(1));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertFalse(server.handler.onClosed.await(1, TimeUnit.SECONDS));
// Succeeded moves to OPEN state and still does not read CLOSE frame
onOpenCallback.succeeded();
assertThat(session.toString(),containsString("OPEN"));
// Demand start receiving frames
session.demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
// Closed handled normally
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(),is(CloseStatus.NORMAL));
frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(),is(OpCode.CLOSE));
assertThat(new CloseStatus(frame).getCode(),is(CloseStatus.NORMAL));
}
static class TestFrameHandler implements FrameHandler.Adaptor
{ {
private CoreSession session; private CoreSession session;
private Consumer<CoreSession> onOpen; private BiFunction<CoreSession,Callback,Void> onOpen;
private CloseStatus closeStatus;
private CountDownLatch onClosed = new CountDownLatch(1);
private Throwable error;
private CountDownLatch onError = new CountDownLatch(1);
private Frame frame;
private CountDownLatch onFrame = new CountDownLatch(1);
public CoreSession getCoreSession() public CoreSession getCoreSession()
{
synchronized (this)
{ {
return session; return session;
} }
}
TestFrameHandler(Consumer<CoreSession> onOpen) TestFrameHandler(BiFunction<CoreSession,Callback,Void> onOpen)
{ {
this.onOpen = onOpen; this.onOpen = onOpen;
} }
@Override @Override
public void onOpen(CoreSession coreSession) public void onOpen(CoreSession coreSession, Callback callback)
{ {
LOG.info("onOpen {}", coreSession); LOG.info("onOpen {}", coreSession);
synchronized (this)
{
session = coreSession; session = coreSession;
onOpen.accept(coreSession); }
onOpen.apply(coreSession, callback);
} }
@Override @Override
@ -112,18 +243,31 @@ public class WebSocketOpenTest extends WebSocketTester
{ {
LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload())); LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload()));
callback.succeeded(); callback.succeeded();
if (onFrame.getCount()==1)
{
this.frame = frame;
onFrame.countDown();
}
}
@Override
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null?null:cause.toString());
if (onError.getCount()!=1)
throw new IllegalStateException();
error = cause;
onError.countDown();
} }
@Override @Override
public void onClosed(CloseStatus closeStatus) public void onClosed(CloseStatus closeStatus)
{ {
LOG.info("onClosed {}", closeStatus); LOG.info("onClosed {}", closeStatus);
} if (onClosed.getCount()!=1)
throw new IllegalStateException();
@Override this.closeStatus = closeStatus;
public void onError(Throwable cause) throws Exception onClosed.countDown();
{
LOG.info("onError {} ", cause == null?null:cause.toString());
} }
@Override @Override
@ -136,17 +280,25 @@ public class WebSocketOpenTest extends WebSocketTester
{ {
sendText(session, text); sendText(session, text);
} }
public void sendText(String text, Callback callback)
{
sendText(session, text, callback);
}
static void sendText(FrameHandler.CoreSession session, String text) static void sendText(FrameHandler.CoreSession session, String text)
{
sendText(session, text, NOOP);
}
static void sendText(FrameHandler.CoreSession session, String text, Callback callback)
{ {
Frame frame = new Frame(OpCode.TEXT); Frame frame = new Frame(OpCode.TEXT);
frame.setFin(true); frame.setFin(true);
frame.setPayload(text); frame.setPayload(text);
session.sendFrame(frame, NOOP, false); session.sendFrame(frame, callback, false);
} }
} }
static class WebSocketServer extends AbstractLifeCycle static class WebSocketServer extends AbstractLifeCycle

View File

@ -92,7 +92,7 @@ public class ChatWebSocketServer
} }
@Override @Override
public void onClosed(CloseStatus closeStatus) public void onClosed(CloseStatus closeStatus) throws Exception
{ {
LOG.debug("onClosed {}", closeStatus); LOG.debug("onClosed {}", closeStatus);
super.onClosed(closeStatus); super.onClosed(closeStatus);

View File

@ -18,12 +18,6 @@
package org.eclipse.jetty.websocket.core.server; package org.eclipse.jetty.websocket.core.server;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
@ -38,28 +32,21 @@ import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.*;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.RawFrameBuilder;
import org.eclipse.jetty.websocket.core.TestFrameHandler;
import org.eclipse.jetty.websocket.core.TestWebSocketNegotiator;
import org.eclipse.jetty.websocket.core.TestWebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.WebSocketTester;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker; import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.*;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
* Tests of a core server with a fake client * Tests of a core server with a fake client
@ -162,9 +149,10 @@ public class WebSocketServerTest extends WebSocketTester
TestFrameHandler serverHandler = new TestFrameHandler() TestFrameHandler serverHandler = new TestFrameHandler()
{ {
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onOpen(CoreSession coreSession, Callback callback)
{ {
super.onOpen(coreSession); super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(1); coreSession.demand(1);
} }
@ -291,9 +279,10 @@ public class WebSocketServerTest extends WebSocketTester
TestFrameHandler serverHandler = new TestFrameHandler() TestFrameHandler serverHandler = new TestFrameHandler()
{ {
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onOpen(CoreSession coreSession, Callback callback)
{ {
super.onOpen(coreSession); super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(3); coreSession.demand(3);
} }
@ -349,9 +338,10 @@ public class WebSocketServerTest extends WebSocketTester
TestFrameHandler serverHandler = new TestFrameHandler() TestFrameHandler serverHandler = new TestFrameHandler()
{ {
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onOpen(CoreSession coreSession, Callback callback)
{ {
super.onOpen(coreSession); super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(2); coreSession.demand(2);
} }
@ -423,9 +413,10 @@ public class WebSocketServerTest extends WebSocketTester
TestFrameHandler serverHandler = new TestFrameHandler() TestFrameHandler serverHandler = new TestFrameHandler()
{ {
@Override @Override
public void onOpen(CoreSession coreSession) throws Exception public void onOpen(CoreSession coreSession, Callback callback)
{ {
super.onOpen(coreSession); super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(2); coreSession.demand(2);
} }