Merge pull request #3291 from eclipse/jetty-10.0.x-3290-websocket-onOpen

Issue #3290 async websocket onOpen, onError and onClose
This commit is contained in:
Greg Wilkins 2019-01-30 12:05:09 +11:00 committed by GitHub
commit 58b73c10af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1325 additions and 665 deletions

View File

@ -876,6 +876,6 @@ public class HttpRequest implements Request
@Override
public String toString()
{
return String.format("%s[%s %s %s]@%x", HttpRequest.class.getSimpleName(), getMethod(), getPath(), getVersion(), hashCode());
return String.format("%s[%s %s %s]@%x", this.getClass().getSimpleName(), getMethod(), getPath(), getVersion(), hashCode());
}
}

View File

@ -111,6 +111,12 @@ public interface Callback extends Invocable
};
}
/**
* Create a callback from the passed success and failure
* @param success Called when the callback succeeds
* @param failure Called when the callback fails
* @return a new Callback
*/
static Callback from(Runnable success, Consumer<Throwable> failure)
{
return new Callback()
@ -129,6 +135,10 @@ public interface Callback extends Invocable
};
}
/** Creaste a callback that runs completed when it succeeds or fails
* @param completed The completion to run on success or failure
* @return a new callback
*/
static Callback from(Runnable completed)
{
return new Completing()
@ -140,6 +150,67 @@ public interface Callback extends Invocable
};
}
/**
* Create a nested callback that runs completed after
* completing the nested callback.
* @param callback The nested callback
* @param completed The completion to run after the nested callback is completed
* @return a new callback.
*/
static Callback from(Callback callback, Runnable completed)
{
return new Nested(callback)
{
public void completed()
{
completed.run();
}
};
}
/**
* Create a nested callback that runs completed before
* completing the nested callback.
* @param callback The nested callback
* @param completed The completion to run before the nested callback is completed. Any exceptions thrown
* from completed will result in a callback failure.
* @return a new callback.
*/
static Callback from(Runnable completed, Callback callback)
{
return new Callback()
{
@Override
public void succeeded()
{
try
{
completed.run();
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
@Override
public void failed(Throwable x)
{
try
{
completed.run();
}
catch(Throwable t)
{
x.addSuppressed(t);
}
callback.failed(x);
}
};
}
class Completing implements Callback
{
@Override
@ -158,7 +229,11 @@ public interface Callback extends Invocable
{
}
}
/**
* Nested Completing Callback that completes after
* completing the nested callback
*/
class Nested extends Completing
{
private final Callback callback;

View File

@ -56,8 +56,7 @@ public class ClientUpgradeRequestImpl extends org.eclipse.jetty.websocket.core.c
UpgradeRequest upgradeRequest = new DelegatedClientUpgradeRequest(this);
UpgradeResponse upgradeResponse = new DelegatedClientUpgradeResponse(response);
JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo,
upgradeRequest, upgradeResponse, futureJavaxSession);
JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureJavaxSession);
return frameHandler;
}

View File

@ -24,6 +24,8 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.websocket.ClientEndpoint;
@ -171,8 +173,14 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple
try
{
Future<Session> sessionFuture = connect(upgradeRequest);
// TODO: apply connect timeouts here?
return sessionFuture.get(); // TODO: unwrap IOException from ExecutionException?
long timeout = coreClient.getHttpClient().getConnectTimeout();
if (timeout>0)
return sessionFuture.get(timeout+1000, TimeUnit.MILLISECONDS);
return sessionFuture.get();
}
catch (TimeoutException e)
{
throw new IOException("Connection future not completed " + destURI, e);
}
catch (Exception e)
{

View File

@ -192,95 +192,55 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
}
@Override
public void onClosed(CloseStatus closeStatus)
public void onOpen(CoreSession coreSession, Callback callback)
{
if (closeHandle != null)
{
try
{
CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
closeHandle.invoke(closeReason);
}
catch (Throwable cause)
{
throw new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause);
}
}
container.removeBean(session);
}
@SuppressWarnings("Duplicates")
@Override
public void onError(Throwable cause)
{
futureSession.completeExceptionally(cause);
if (errorHandle == null)
{
LOG.warn("Unhandled Error: " + endpointInstance, cause);
return;
}
try
{
errorHandle.invoke(cause);
}
catch (Throwable t)
{
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
throw wsError;
}
}
this.coreSession = coreSession;
session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);
@Override
public void onOpen(CoreSession coreSession) throws Exception
{
this.coreSession = coreSession;
session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);
openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig);
closeHandle = InvokerUtils.bindTo(closeHandle, session);
errorHandle = InvokerUtils.bindTo(errorHandle, session);
openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig);
closeHandle = InvokerUtils.bindTo(closeHandle, session);
errorHandle = InvokerUtils.bindTo(errorHandle, session);
JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata);
JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata);
JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata);
JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata);
pongHandle = InvokerUtils.bindTo(pongHandle, session);
pongHandle = InvokerUtils.bindTo(pongHandle, session);
if (actualTextMetadata != null)
{
actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session);
actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session);
textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata);
textMetadata = actualTextMetadata;
}
if (actualBinaryMetadata != null)
{
actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session);
actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session);
binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata);
binaryMetadata = actualBinaryMetadata;
}
if (openHandle != null)
{
try
if (actualTextMetadata != null)
{
actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session);
actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session);
textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata);
textMetadata = actualTextMetadata;
}
if (actualBinaryMetadata != null)
{
actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session);
actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session);
binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata);
binaryMetadata = actualBinaryMetadata;
}
if (openHandle != null)
openHandle.invoke();
}
catch (Throwable cause)
{
throw new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
}
}
container.addBean(session, true);
futureSession.complete(session);
container.addBean(session, true);
futureSession.complete(session);
callback.succeeded();
}
catch (Throwable cause)
{
Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
// TODO This feels like double handling of the exception? Review need for futureSession
futureSession.completeExceptionally(wse);
callback.failed(wse);
}
}
@Override
@ -314,6 +274,50 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
dataType = OpCode.UNDEFINED;
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
try
{
if (closeHandle != null)
{
CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
closeHandle.invoke(closeReason);
}
container.removeBean(session);
callback.succeeded();
}
catch (Throwable cause)
{
callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause));
}
}
@Override
public void onError(Throwable cause, Callback callback)
{
try
{
futureSession.completeExceptionally(cause);
if (errorHandle != null)
errorHandle.invoke(cause);
else
LOG.warn("Unhandled Error: " + endpointInstance, cause);
callback.succeeded();
}
catch (Throwable t)
{
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
callback.failed(wsError);
// TODO should futureSession be failed here?
}
}
public Set<MessageHandler> getMessageHandlers()
{
if (messageHandlerMap.isEmpty())

View File

@ -220,10 +220,10 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
{
getBasicRemote().sendObject(obj);
}
catch (Throwable cause)
catch (Exception cause)
{
// TODO: need way to fail Channel.
frameHandler.onError(cause);
// TODO review this
throw new RuntimeException(cause);
}
}
}

View File

@ -18,6 +18,13 @@
package org.eclipse.jetty.websocket.javax.common;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
@ -25,12 +32,6 @@ import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.Session;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
@ -44,12 +45,12 @@ public class JavaxWebSocketFrameHandler_OnCloseTest extends AbstractJavaxWebSock
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// These invocations are the same for all tests
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
CloseStatus status = new CloseStatus(CloseStatus.NORMAL, "Normal");
Frame closeFrame = status.toFrame();
localEndpoint.onFrame(closeFrame, Callback.from(() ->
{
localEndpoint.onClosed(status);
localEndpoint.onClosed(status, Callback.NOOP);
}, t ->
{
throw new RuntimeException(t);

View File

@ -18,14 +18,16 @@
package org.eclipse.jetty.websocket.javax.common;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnError;
import javax.websocket.Session;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
@ -40,8 +42,8 @@ public class JavaxWebSocketFrameHandler_OnErrorTest extends AbstractJavaxWebSock
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// These invocations are the same for all tests
localEndpoint.onOpen(channel);
localEndpoint.onError(new RuntimeException("From Testcase"));
localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onError(new RuntimeException("From Testcase"), Callback.NOOP);
String event = socket.events.poll(1, TimeUnit.SECONDS);
assertThat("Event", event, eventMatcher);
}

View File

@ -18,6 +18,15 @@
package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.Frame;
@ -25,14 +34,6 @@ import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -44,7 +45,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest extends Abstr
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
func.apply(localEndpoint);

View File

@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.javax.common;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame;
@ -27,13 +35,6 @@ import org.eclipse.jetty.websocket.javax.common.util.InvalidSignatureException;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
@ -47,7 +48,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_BinaryTest extends AbstractJav
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
assertThat("Has Binary Metadata", localEndpoint.getBinaryMetadata(), notNullValue());

View File

@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.io.Reader;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.Frame;
@ -25,13 +33,6 @@ import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import java.io.IOException;
import java.io.Reader;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -43,7 +44,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextStreamTest extends Abstrac
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
func.apply(localEndpoint);

View File

@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.javax.common;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame;
@ -27,13 +35,6 @@ import org.eclipse.jetty.websocket.javax.common.util.InvalidSignatureException;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
@ -46,7 +47,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextTest extends AbstractJavax
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
ByteBuffer payload = BufferUtil.toBuffer(msg, StandardCharsets.UTF_8);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload(payload).setFin(true), Callback.NOOP);

View File

@ -18,14 +18,16 @@
package org.eclipse.jetty.websocket.javax.common;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
@ -38,7 +40,7 @@ public class JavaxWebSocketFrameHandler_OnOpenTest extends AbstractJavaxWebSocke
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
String event = socket.events.poll(1, TimeUnit.SECONDS);
assertThat("Event", event, eventMatcher);
}

View File

@ -18,19 +18,6 @@
package org.eclipse.jetty.websocket.javax.server;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import javax.websocket.EndpointConfig;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
import org.eclipse.jetty.io.ByteBufferPool;
@ -51,6 +38,19 @@ import org.eclipse.jetty.websocket.javax.server.internal.JavaxWebSocketCreator;
import org.eclipse.jetty.websocket.javax.server.internal.UndefinedServerEndpointConfig;
import org.eclipse.jetty.websocket.servlet.WebSocketMapping;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import javax.websocket.EndpointConfig;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
@ManagedObject("JSR356 Server Container")
public class JavaxWebSocketServerContainer
extends JavaxWebSocketClientContainer
@ -310,9 +310,10 @@ public class JavaxWebSocketServerContainer
@Override
public int getDefaultMaxBinaryMessageBufferSize()
{
// TODO: warn on long -> int conversion issue
// TODO: Should this be Filter?
return (int)customizer.getMaxBinaryMessageSize();
long max = customizer.getMaxBinaryMessageSize();
if (max > (long)Integer.MAX_VALUE)
return Integer.MAX_VALUE;
return (int)max;
}
@Override
@ -324,8 +325,10 @@ public class JavaxWebSocketServerContainer
@Override
public int getDefaultMaxTextMessageBufferSize()
{
// TODO: warn on long -> int conversion issue
return (int)customizer.getMaxTextMessageSize();
long max = customizer.getMaxTextMessageSize();
if (max > (long)Integer.MAX_VALUE)
return Integer.MAX_VALUE;
return (int)max;
}
@Override

View File

@ -229,32 +229,34 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
this.endPoint = endPoint;
}
@Override
public void onClosed(CloseStatus closeStatus)
{
}
@Override
public void onError(Throwable cause) throws Exception
public void onOpen(CoreSession coreSession, Callback callback)
{
this.session = coreSession;
callback.succeeded();
}
@Override
public void onFrame(Frame frame, Callback callback)
{
receivedFrames.offer(Frame.copy(frame));
synchronized(this)
{
callback.succeeded();
}
callback.succeeded();
}
@Override
public void onOpen(CoreSession coreSession) throws Exception
public void onError(Throwable cause, Callback callback)
{
this.session = coreSession;
callback.succeeded();
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
callback.succeeded();
}
public void writeRaw(ByteBuffer buffer) throws IOException
{
synchronized (this)

View File

@ -32,9 +32,10 @@ public class FrameEcho implements FrameHandler
private CoreSession coreSession;
@Override
public void onOpen(CoreSession coreSession) throws Exception
public void onOpen(CoreSession coreSession, Callback callback)
{
this.coreSession = coreSession;
callback.succeeded();
}
@Override
@ -47,15 +48,18 @@ public class FrameEcho implements FrameHandler
}
@Override
public void onClosed(CloseStatus closeStatus)
{
coreSession = null;
}
@Override
public void onError(Throwable cause) throws Exception
public void onError(Throwable cause, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug(this + " onError ", cause);
callback.succeeded();
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
coreSession = null;
callback.succeeded();
}
}

View File

@ -18,17 +18,17 @@
package org.eclipse.jetty.websocket.javax.tests.framehandlers;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.MessageHandler;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.MessageHandler;
public class FrameHandlerTracker extends MessageHandler
{
public CountDownLatch openLatch = new CountDownLatch(1);
@ -45,26 +45,9 @@ public class FrameHandlerTracker extends MessageHandler
}
@Override
public void onOpen(CoreSession coreSession) throws Exception
public void onOpen(CoreSession coreSession, Callback callback)
{
super.onOpen(coreSession);
openLatch.countDown();
}
@Override
public void onClosed(CloseStatus closeStatus)
{
super.onClosed(closeStatus);
closeDetail.compareAndSet(null, closeStatus);
closeLatch.countDown();
}
@Override
public void onError(Throwable cause) throws Exception
{
super.onError(cause);
error.compareAndSet(null, cause);
super.onOpen(coreSession, Callback.from(callback,()->openLatch.countDown()));
}
@Override
@ -80,4 +63,21 @@ public class FrameHandlerTracker extends MessageHandler
bufferQueue.offer(BufferUtil.copy(wholeMessage));
callback.succeeded();
}
@Override
public void onError(Throwable cause, Callback callback)
{
super.onError(cause, Callback.from(callback, ()-> error.compareAndSet(null, cause)));
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
super.onClosed(closeStatus, Callback.from(callback,()->
{
closeDetail.compareAndSet(null, closeStatus);
closeLatch.countDown();
}));
}
}

View File

@ -18,6 +18,23 @@
package org.eclipse.jetty.websocket.javax.tests.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
@ -28,31 +45,15 @@ import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.MessageHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.javax.common.util.TextUtil;
import org.eclipse.jetty.websocket.javax.tests.CoreServer;
import org.eclipse.jetty.websocket.javax.tests.DataUtils;
import org.eclipse.jetty.websocket.javax.common.util.TextUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@ -292,9 +293,10 @@ public class MessageReceivingTest
}
@Override
public void onError(Throwable cause)
public void onError(Throwable cause, Callback callback)
{
LOG.warn(cause);
callback.succeeded();
}
}
@ -323,9 +325,10 @@ public class MessageReceivingTest
}
@Override
public void onError(Throwable cause)
public void onError(Throwable cause, Callback callback)
{
LOG.warn(cause);
callback.succeeded();
}
}

View File

@ -30,19 +30,19 @@ import javax.websocket.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSocket;
import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSessionSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionReasonSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSocket;
import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@ -112,11 +112,11 @@ public class OnCloseTest
CompletableFuture<Session> futureSession = new CompletableFuture<>();
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(endpoint, request, response, futureSession);
frameHandler.onOpen(new FrameHandler.CoreSession.Empty());
frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
// Execute onClose call
frameHandler.onFrame(CloseStatus.toFrame(CloseStatus.NORMAL), Callback.NOOP);
frameHandler.onClosed(CloseStatus.NORMAL_STATUS);
frameHandler.onClosed(CloseStatus.NORMAL_STATUS, Callback.NOOP);
// Test captured event
BlockingQueue<String> events = endpoint.events;

View File

@ -33,12 +33,9 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.tests.MessageType;
import org.eclipse.jetty.websocket.javax.tests.SessionMatchers;
import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.ByteBufferPartialHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.LongMessageHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.StringWholeHandler;
import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientFrameHandlerFactory;
import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory;
@ -47,20 +44,20 @@ import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientFrameHandlerFactory;
import org.eclipse.jetty.websocket.javax.tests.MessageType;
import org.eclipse.jetty.websocket.javax.tests.SessionMatchers;
import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.ByteBufferPartialHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.LongMessageHandler;
import org.eclipse.jetty.websocket.javax.tests.handlers.StringWholeHandler;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
public class SessionAddMessageHandlerTest
{
@ -88,7 +85,7 @@ public class SessionAddMessageHandlerTest
JavaxWebSocketFrameHandlerFactory frameHandlerFactory = new JavaxWebSocketClientFrameHandlerFactory(container);
CompletableFuture<Session> futureSession = new CompletableFuture<>();
frameHandler = frameHandlerFactory.newJavaxFrameHandler(ei, handshakeRequest, handshakeResponse, futureSession);
frameHandler.onOpen(new FrameHandler.CoreSession.Empty());
frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
// Session
session = frameHandler.getSession();

View File

@ -18,13 +18,6 @@
package org.eclipse.jetty.websocket.javax.tests.client.misbehaving;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.javax.tests.CoreServer;
@ -32,6 +25,12 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@ -99,7 +98,7 @@ public class MisbehavingClassTest
Exception e = assertThrows(IOException.class, () -> container.connectToServer(socket, server.getWsUri()), "Should have failed .connectToServer()");
assertThat(e.getCause(), instanceOf(ExecutionException.class));
assertThat("Close should have occurred", socket.closeLatch.await(1, TimeUnit.SECONDS), is(true));
assertThat("Close should have occurred", socket.closeLatch.await(5, TimeUnit.SECONDS), is(true));
Throwable cause = socket.errors.pop();
assertThat("Error", cause, instanceOf(RuntimeException.class));

View File

@ -35,12 +35,12 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
@ -57,7 +57,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextStreamTest extends Abstrac
// Establish endpoint function
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request, response, futureSession);
frameHandler.onOpen(new FrameHandler.CoreSession.Empty());
frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
func.accept(frameHandler);
return socket;
}

View File

@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.api;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import java.net.HttpCookie;
import java.net.URI;
import java.security.Principal;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
/**
* The HTTP Upgrade to WebSocket Request
*/

View File

@ -18,6 +18,11 @@
package org.eclipse.jetty.websocket.common;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -38,11 +43,6 @@ import org.eclipse.jetty.websocket.core.UpgradeException;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
public class JettyWebSocketFrameHandler implements FrameHandler
{
private final Logger log;
@ -114,63 +114,47 @@ public class JettyWebSocketFrameHandler implements FrameHandler
}
@Override
public void onClosed(CloseStatus closeStatus)
public void onOpen(CoreSession coreSession, Callback callback)
{
// TODO: FrameHandler cleanup?
}
@SuppressWarnings("Duplicates")
@Override
public void onError(Throwable cause)
{
cause = convertCause(cause);
futureSession.completeExceptionally(cause);
if (errorHandle == null)
{
log.warn("Unhandled Error: Endpoint " + endpointInstance.getClass().getName() + " : " + cause);
if (log.isDebugEnabled())
log.debug("unhandled", cause);
return;
}
try
{
errorHandle.invoke(cause);
customizer.customize(coreSession);
session = new WebSocketSessionImpl(coreSession, this, upgradeRequest, upgradeResponse);
frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session);
openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session);
closeHandle = JettyWebSocketFrameHandlerFactory.bindTo(closeHandle, session);
errorHandle = JettyWebSocketFrameHandlerFactory.bindTo(errorHandle, session);
textHandle = JettyWebSocketFrameHandlerFactory.bindTo(textHandle, session);
binaryHandle = JettyWebSocketFrameHandlerFactory.bindTo(binaryHandle, session);
pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session);
pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session);
if (textHandle != null)
textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize());
if (binaryHandle != null)
binarySink = JettyWebSocketFrameHandlerFactory
.createMessageSink(binaryHandle, binarySinkClass, executor, coreSession.getMaxBinaryMessageSize());
if (openHandle != null)
openHandle.invoke();
futureSession.complete(session);
callback.succeeded();
}
catch (Throwable t)
catch (Throwable cause)
{
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
throw wsError;
// TODO should futureSession be failed here?
callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause));
}
}
public static Throwable convertCause(Throwable cause)
{
if (cause instanceof MessageTooLargeException)
return new org.eclipse.jetty.websocket.api.MessageTooLargeException(cause.getMessage(), cause);
if (cause instanceof ProtocolException)
return new org.eclipse.jetty.websocket.api.ProtocolException(cause.getMessage(), cause);
if (cause instanceof BadPayloadException)
return new org.eclipse.jetty.websocket.api.BadPayloadException(cause.getMessage(), cause);
if (cause instanceof CloseException)
return new org.eclipse.jetty.websocket.api.CloseException(((CloseException)cause).getStatusCode(), cause.getMessage(), cause);
if (cause instanceof WebSocketTimeoutException)
return new org.eclipse.jetty.websocket.api.WebSocketTimeoutException(cause.getMessage(), cause);
if (cause instanceof InvalidSignatureException)
return new org.eclipse.jetty.websocket.api.InvalidWebSocketException(cause.getMessage(), cause);
if (cause instanceof UpgradeException)
return new org.eclipse.jetty.websocket.api.UpgradeException(((UpgradeException)cause).getRequestURI(), cause);
return cause;
}
/**
* @see #onFrame(Frame,Callback)
*/
public final void onFrame(Frame frame) {}
@Override
public void onFrame(Frame frame, Callback callback)
@ -191,61 +175,56 @@ public class JettyWebSocketFrameHandler implements FrameHandler
switch (frame.getOpCode())
{
case OpCode.CLOSE:
onClose(frame, callback);
onCloseFrame(frame, callback);
break;
case OpCode.PING:
onPing(frame, callback);
onPingFrame(frame, callback);
break;
case OpCode.PONG:
onPong(frame, callback);
onPongFrame(frame, callback);
break;
case OpCode.TEXT:
onText(frame, callback);
onTextFrame(frame, callback);
break;
case OpCode.BINARY:
onBinary(frame, callback);
onBinaryFrame(frame, callback);
break;
case OpCode.CONTINUATION:
onContinuation(frame, callback);
onContinuationFrame(frame, callback);
break;
}
}
@Override
public void onOpen(CoreSession coreSession)
public void onError(Throwable cause, Callback callback)
{
customizer.customize(coreSession);
session = new WebSocketSessionImpl(coreSession, this, upgradeRequest, upgradeResponse);
frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session);
openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session);
closeHandle = JettyWebSocketFrameHandlerFactory.bindTo(closeHandle, session);
errorHandle = JettyWebSocketFrameHandlerFactory.bindTo(errorHandle, session);
textHandle = JettyWebSocketFrameHandlerFactory.bindTo(textHandle, session);
binaryHandle = JettyWebSocketFrameHandlerFactory.bindTo(binaryHandle, session);
pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session);
pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session);
if (textHandle != null)
textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize());
if (binaryHandle != null)
binarySink = JettyWebSocketFrameHandlerFactory.createMessageSink(binaryHandle, binarySinkClass, executor, coreSession.getMaxBinaryMessageSize());
if (openHandle != null)
try
{
try
{
openHandle.invoke();
}
catch (Throwable cause)
{
throw new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
}
}
cause = convertCause(cause);
futureSession.completeExceptionally(cause);
futureSession.complete(session);
if (errorHandle != null)
errorHandle.invoke(cause);
else
{
log.warn("Unhandled Error: Endpoint " + endpointInstance.getClass().getName() + " : " + cause);
if (log.isDebugEnabled())
log.debug("unhandled", cause);
}
callback.succeeded();
}
catch (Throwable t)
{
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
callback.failed(wsError);
}
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
callback.succeeded();
}
public String toString()
@ -265,7 +244,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
activeMessageSink = null;
}
private void onBinary(Frame frame, Callback callback)
private void onBinaryFrame(Frame frame, Callback callback)
{
if (activeMessageSink == null)
activeMessageSink = binarySink;
@ -273,7 +252,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
acceptMessage(frame, callback);
}
private void onClose(Frame frame, Callback callback)
private void onCloseFrame(Frame frame, Callback callback)
{
if (closeHandle != null)
{
@ -290,12 +269,12 @@ public class JettyWebSocketFrameHandler implements FrameHandler
callback.succeeded();
}
private void onContinuation(Frame frame, Callback callback)
private void onContinuationFrame(Frame frame, Callback callback)
{
acceptMessage(frame, callback);
}
private void onPing(Frame frame, Callback callback)
private void onPingFrame(Frame frame, Callback callback)
{
if (pingHandle != null)
{
@ -323,7 +302,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
callback.succeeded();
}
private void onPong(Frame frame, Callback callback)
private void onPongFrame(Frame frame, Callback callback)
{
if (pongHandle != null)
{
@ -343,11 +322,39 @@ public class JettyWebSocketFrameHandler implements FrameHandler
callback.succeeded();
}
private void onText(Frame frame, Callback callback)
private void onTextFrame(Frame frame, Callback callback)
{
if (activeMessageSink == null)
activeMessageSink = textSink;
acceptMessage(frame, callback);
}
static Throwable convertCause(Throwable cause)
{
if (cause instanceof MessageTooLargeException)
return new org.eclipse.jetty.websocket.api.MessageTooLargeException(cause.getMessage(), cause);
if (cause instanceof ProtocolException)
return new org.eclipse.jetty.websocket.api.ProtocolException(cause.getMessage(), cause);
if (cause instanceof BadPayloadException)
return new org.eclipse.jetty.websocket.api.BadPayloadException(cause.getMessage(), cause);
if (cause instanceof CloseException)
return new org.eclipse.jetty.websocket.api.CloseException(((CloseException)cause).getStatusCode(), cause.getMessage(), cause);
if (cause instanceof WebSocketTimeoutException)
return new org.eclipse.jetty.websocket.api.WebSocketTimeoutException(cause.getMessage(), cause);
if (cause instanceof InvalidSignatureException)
return new org.eclipse.jetty.websocket.api.InvalidWebSocketException(cause.getMessage(), cause);
if (cause instanceof UpgradeException)
return new org.eclipse.jetty.websocket.api.UpgradeException(((UpgradeException)cause).getRequestURI(), cause);
return cause;
}
}

View File

@ -119,7 +119,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello?").setFin(true), Callback.NOOP);
localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP);
@ -163,7 +163,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello Text Stream").setFin(true), Callback.NOOP);
localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP);
@ -185,7 +185,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hel").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("lo ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("Wor").setFin(false), Callback.NOOP);
@ -208,7 +208,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP);
@ -238,7 +238,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP);
@ -264,10 +264,10 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onError(new RuntimeException("Nothing to see here"));
localEndpoint.onError(new RuntimeException("Nothing to see here"), Callback.NOOP);
// Validate Events
socket.events.assertEvents(
@ -284,7 +284,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP);
@ -314,7 +314,7 @@ public class JettyWebSocketFrameHandlerTest
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.PING).setPayload("You there?"), Callback.NOOP);

View File

@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.core;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
/**
* Representation of a WebSocket Close (status code &amp; reason)
*/
@ -355,5 +355,11 @@ public class CloseStatus
{
return CloseStatus.this;
}
@Override
public String toString()
{
return super.toString() + ":" + CloseStatus.this.toString();
}
}
}

View File

@ -49,33 +49,31 @@ import org.eclipse.jetty.websocket.core.server.Negotiation;
* Once instantiated the FrameHandler follows is used as follows:
* </p>
* <ul>
* <li>The {@link #onOpen(CoreSession)} method is called when negotiation of the connection is completed. The passed {@link CoreSession} instance is used
* <li>The {@link #onOpen(CoreSession,Callback)} method is called when negotiation of the connection is completed. The passed {@link CoreSession} instance is used
* to obtain information about the connection and to send frames</li>
* <li>Every data and control frame received is passed to {@link #onFrame(Frame, Callback)}.</li>
* <li>Received Control Frames that require a response (eg Ping, Close) are first passed to the {@link #onFrame(Frame, Callback)} to give the
* Application an opportunity to send the response itself. If an appropriate response has not been sent when the callback passed is completed, then a
* response will be generated.</li>
* <li>If an error is detected or received, then {@link #onError(Throwable)} will be called to inform the application of the cause of the problem.
* The connection will then be closed or aborted and the {@link #onClosed(CloseStatus)} method called.</li>
* <li>The {@link #onClosed(CloseStatus)} method is always called once a websocket connection is terminated, either gracefully or not. The error code
* <li>If an error is detected or received, then {@link #onError(Throwable,Callback)} will be called to inform the application of the cause of the problem.
* The connection will then be closed or aborted and the {@link #onClosed(CloseStatus,Callback)} method called.</li>
* <li>The {@link #onClosed(CloseStatus,Callback)} method is always called once a websocket connection is terminated, either gracefully or not. The error code
* will indicate the nature of the close.</li>
* </ul>
*/
public interface FrameHandler extends IncomingFrames
{
// TODO: have conversation about "throws Exception" vs "throws WebSocketException" vs "throws Throwable" in below signatures.
/**
* Connection is being opened.
* Async notification that Connection is being opened.
* <p>
* FrameHandler can write during this call, but will not receive frames until
* the onOpen() completes.
* </p>
*
* @param coreSession the channel associated with this connection.
* @throws Exception if unable to open. TODO: will close the connection (optionally choosing close status code based on WebSocketException type)?
* @param callback the callback to indicate success in processing (or failure)
*/
void onOpen(CoreSession coreSession) throws Exception;
void onOpen(CoreSession coreSession, Callback callback);
/**
* Receiver of all Frames.
@ -92,6 +90,16 @@ public interface FrameHandler extends IncomingFrames
*/
void onFrame(Frame frame, Callback callback);
/**
* An error has occurred or been detected in websocket-core and being reported to FrameHandler.
* A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status
* derived from the error.
*
* @param cause the reason for the error
* @param callback the callback to indicate success in processing (or failure)
*/
void onError(Throwable cause, Callback callback);
/**
* This is the Close Handshake Complete event.
* <p>
@ -100,18 +108,10 @@ public interface FrameHandler extends IncomingFrames
* </p>
*
* @param closeStatus the close status received from remote, or in the case of abnormal closure from local.
* @param callback the callback to indicate success in processing (or failure)
*/
void onClosed(CloseStatus closeStatus);
void onClosed(CloseStatus closeStatus, Callback callback);
/**
* An error has occurred or been detected in websocket-core and being reported to FrameHandler.
* A call to onError will be followed by a call to {@link #onClosed(CloseStatus)} giving the close status
* derived from the error.
*
* @param cause the reason for the error
* @throws Exception if unable to process the error.
*/
void onError(Throwable cause) throws Exception;
/**
* Does the FrameHandler manage it's own demand?
@ -125,7 +125,6 @@ public interface FrameHandler extends IncomingFrames
return false;
}
interface Configuration
{
@ -168,6 +167,7 @@ public interface FrameHandler extends IncomingFrames
void setMaxTextMessageSize(long maxSize);
}
/**
* Represents the outgoing Frames.
*/
@ -217,22 +217,6 @@ public interface FrameHandler extends IncomingFrames
*/
boolean isSecure();
/**
* Issue a harsh abort of the underlying connection.
* <p>
* This will terminate the connection, without sending a websocket close frame.
* No WebSocket Protocol close handshake will be performed.
* </p>
* <p>
* Once called, any read/write activity on the websocket from this point will be indeterminate.
* This can result in the {@link #onError(Throwable)} event being called indicating any issue that arises.
* </p>
* <p>
* Once the underlying connection has been determined to be closed, the {@link #onClosed(CloseStatus)} event will be called.
* </p>
*/
void abort();
/**
* @return Client or Server behaviour
*/
@ -295,6 +279,22 @@ public interface FrameHandler extends IncomingFrames
*/
void close(int statusCode, String reason, Callback callback);
/**
* Issue a harsh abort of the underlying connection.
* <p>
* This will terminate the connection, without sending a websocket close frame.
* No WebSocket Protocol close handshake will be performed.
* </p>
* <p>
* Once called, any read/write activity on the websocket from this point will be indeterminate.
* This can result in the {@link #onError(Throwable,Callback)} event being called indicating any issue that arises.
* </p>
* <p>
* Once the underlying connection has been determined to be closed, the {@link #onClosed(CloseStatus,Callback)} event will be called.
* </p>
*/
void abort();
/**
* Manage flow control by indicating demand for handling Frames. A call to
* {@link FrameHandler#onFrame(Frame, Callback)} will only be made if a
@ -501,7 +501,7 @@ public interface FrameHandler extends IncomingFrames
@Override
public Duration getIdleTimeout()
{
return timeout;
return timeout==null ? Duration.ZERO : timeout;
}
@Override

View File

@ -18,6 +18,10 @@
package org.eclipse.jetty.websocket.core;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingNestedCallback;
@ -26,10 +30,6 @@ import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
/**
* A utility implementation of FrameHandler that defragments
* text frames into a String message before calling {@link #onText(String, Callback)}.
@ -128,17 +128,18 @@ public class MessageHandler implements FrameHandler
this.maxBinaryMessageSize = maxBinaryMessageSize;
}
@Override
public void onOpen(CoreSession coreSession) throws Exception
{
this.coreSession = coreSession;
}
public CoreSession getCoreSession()
{
return coreSession;
}
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
this.coreSession = coreSession;
callback.succeeded();
}
@Override
public void onFrame(Frame frame, Callback callback)
{
@ -229,6 +230,40 @@ public class MessageHandler implements FrameHandler
}
}
@Override
public void onError(Throwable cause, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug(this + " onError ", cause);
callback.succeeded();
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onClosed {}", this, closeStatus);
if (utf8StringBuilder != null && utf8StringBuilder.length() > 0 && closeStatus.isNormal())
LOG.warn("{} closed with partial message: {} chars", utf8StringBuilder.length());
if (binaryMessage != null)
{
if (BufferUtil.hasContent(binaryMessage))
LOG.warn("{} closed with partial message: {} bytes", binaryMessage.remaining());
getCoreSession().getByteBufferPool().release(binaryMessage);
binaryMessage = null;
}
if (utf8StringBuilder != null)
{
utf8StringBuilder.reset();
utf8StringBuilder = null;
}
coreSession = null;
callback.succeeded();
}
private void onTextFrame(Frame frame, Callback callback)
{
if (frame.hasPayload())
@ -298,8 +333,8 @@ public class MessageHandler implements FrameHandler
/**
* Method called when a complete text message is received.
*
* @param message
* @param callback
* @param message the received text payload
* @param callback The callback to signal completion of handling.
*/
protected void onText(String message, Callback callback)
{
@ -309,8 +344,8 @@ public class MessageHandler implements FrameHandler
/**
* Method called when a complete binary message is received.
*
* @param message
* @param callback
* @param message The binary payload
* @param callback The callback to signal completion of handling.
*/
protected void onBinary(ByteBuffer message, Callback callback)
{
@ -423,36 +458,4 @@ public class MessageHandler implements FrameHandler
}
}.iterate();
}
@Override
public void onClosed(CloseStatus closeStatus)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onClosed {}", this, closeStatus);
if (utf8StringBuilder != null && utf8StringBuilder.length() > 0 && closeStatus.isNormal())
LOG.warn("{} closed with partial message: {} chars", utf8StringBuilder.length());
if (binaryMessage != null)
{
if (BufferUtil.hasContent(binaryMessage))
LOG.warn("{} closed with partial message: {} bytes", binaryMessage.remaining());
getCoreSession().getByteBufferPool().release(binaryMessage);
binaryMessage = null;
}
if (utf8StringBuilder != null)
{
utf8StringBuilder.reset();
utf8StringBuilder = null;
}
coreSession = null;
}
@Override
public void onError(Throwable cause) throws Exception
{
if (LOG.isDebugEnabled())
LOG.debug(this + " onError ", cause);
}
}

View File

@ -18,6 +18,16 @@
package org.eclipse.jetty.websocket.core.client;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConversation;
import org.eclipse.jetty.client.HttpRequest;
@ -54,16 +64,6 @@ import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
public abstract class UpgradeRequest extends HttpRequest implements Response.CompleteListener, HttpConnectionUpgrader
{
public static UpgradeRequest from(WebSocketCoreClient webSocketClient, URI requestURI, FrameHandler frameHandler)
@ -344,9 +344,14 @@ public abstract class UpgradeRequest extends HttpRequest implements Response.Com
notifyUpgradeListeners((listener) -> listener.onHandshakeResponse(this, response));
// Now swap out the connection
endp.upgrade(wsConnection);
futureCoreSession.complete(wsChannel);
try
{
endp.upgrade(wsConnection);
}
finally
{
futureCoreSession.complete(wsChannel);
}
}
/**

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.websocket.core.internal;
import java.io.Closeable;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
@ -30,10 +33,6 @@ import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketException;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.function.Supplier;
/**
* Parsing of a frames in WebSocket land.
*/
@ -415,5 +414,13 @@ public class Parser
{
return releaseable;
}
@Override
public String toString()
{
if (closeStatus==null)
return super.toString();
return super.toString() + ":" + closeStatus;
}
}
}

View File

@ -52,6 +52,8 @@ import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
import static org.eclipse.jetty.util.Callback.NOOP;
/**
* The Core WebSocket Session.
*/
@ -262,7 +264,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override
public void close(Callback callback)
{
close(NO_CODE, callback, false);
close(NO_CODE, callback);
}
/**
@ -275,12 +277,12 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override
public void close(int statusCode, String reason, Callback callback)
{
close(new CloseStatus(statusCode, reason), callback, false);
close(new CloseStatus(statusCode, reason), callback);
}
private void close(CloseStatus closeStatus, Callback callback, boolean batch)
private void close(CloseStatus closeStatus, Callback callback)
{
sendFrame(closeStatus.toFrame(), callback, batch);
sendFrame(closeStatus.toFrame(), callback, false);
}
@Override
@ -293,39 +295,57 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString());
if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus);
closeConnection(cause, closeStatus, NOOP);
}
public void closeConnection(Throwable cause, CloseStatus closeStatus)
public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback)
{
connection.cancelDemand();
if (connection.getEndPoint().isOpen())
connection.close();
// Forward Errors to Local WebSocket EndPoint
if (cause!=null)
{
Callback errorCallback = Callback.from(()->
{
try
{
handler.onClosed(closeStatus, callback);
}
catch (Throwable e)
{
LOG.warn(e);
callback.failed(e);
}
});
try
{
handler.onError(cause);
handler.onError(cause,errorCallback);
}
catch (Throwable e)
{
if (e != cause)
cause.addSuppressed(e);
LOG.warn(cause);
errorCallback.failed(cause);
}
}
else
{
try
{
handler.onClosed(closeStatus, callback);
}
catch (Throwable e)
{
LOG.warn(e);
callback.failed(e);
}
}
try
{
handler.onClosed(closeStatus);
}
catch (Throwable e)
{
LOG.warn(e);
}
if (connection.getEndPoint().isOpen())
connection.close();
}
AbnormalCloseStatus abnormalCloseStatusFor(Throwable cause)
@ -344,7 +364,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
else
code = CloseStatus.SERVER_ERROR;
return new AbnormalCloseStatus(code, cause.getMessage());
return new AbnormalCloseStatus(code, cause);
}
/**
@ -353,8 +373,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
* otherwise just close the connection.
*
* @param cause the cause
* @param callback the callback on completion of error handling
*/
public void processConnectionError(Throwable cause)
public void processConnectionError(Throwable cause, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("processConnectionError {} {}", this, cause);
@ -362,9 +383,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
CloseStatus closeStatus = abnormalCloseStatusFor(cause);
if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, Callback.NOOP, false);
close(closeStatus, NOOP);
else if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus);
closeConnection(cause, closeStatus, callback);
}
/**
@ -372,13 +393,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
* Send an abnormal close frame to ensure connection is closed.
*
* @param cause the cause
* @param callback the callback on completion of error handling
*/
public void processHandlerError(Throwable cause)
public void processHandlerError(Throwable cause, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("processHandlerError {} {}", this, cause);
close(abnormalCloseStatusFor(cause), Callback.NOOP, false);
close(abnormalCloseStatusFor(cause), callback);
}
/**
@ -389,43 +411,44 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (LOG.isDebugEnabled())
LOG.debug("onOpen() {}", this);
// Upgrade success
channelState.onConnected();
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to CONNECTED");
Callback openCallback = Callback.from(()->
{
channelState.onOpen();
if (!demanding)
connection.demand(1);
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to OPEN");
},
x->
{
LOG.warn("Error during OPEN", x);
processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, x), NOOP);
});
try
{
// Upgrade success
channelState.onConnected();
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to CONNECTED");
// Open connection and handler
channelState.onOpen();
handler.onOpen(this);
if (!demanding)
connection.demand(1);
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to OPEN");
handler.onOpen(this, openCallback);
}
catch (Throwable t)
{
LOG.warn("Error during OPEN", t);
try
{
handler.onError(t);
}
catch (Exception e)
{
t.addSuppressed(e);
}
processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, t));
openCallback.failed(t);
}
}
@Override
public void demand(long n)
{
if (!demanding)
throw new IllegalStateException();
throw new IllegalStateException("FrameHandler is not demanding: " + this);
if (!channelState.isInputOpen())
throw new IllegalStateException("FrameHandler input not open: " + this); // TODO Perhaps this is a NOOP?
connection.demand(n);
}
@ -458,9 +481,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
try
{
assertValidOutgoing(frame);
@ -476,44 +496,50 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
synchronized(flusher)
{
boolean closeConnection = channelState.onOutgoingFrame(frame);
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {}) {}", frame, callback, batch, closeConnection);
if (frame.getOpCode() == OpCode.CLOSE)
if (closeConnection)
{
if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch);
Throwable cause = AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame));
if (closeConnection)
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
closeConnection(null, channelState.getCloseStatus());
}
};
}
Callback closeConnectionCallback = Callback.from(
()->closeConnection(cause, channelState.getCloseStatus(), callback),
x->closeConnection(cause, channelState.getCloseStatus(), Callback.from(
()-> callback.failed(x),
x2->
{
x.addSuppressed(x2);
callback.failed(x);
})));
flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false));
}
else
{
flusher.queue.offer(new FrameEntry(frame, callback, batch));
}
flusher.queue.offer(new FrameEntry(frame, callback, batch));
}
flusher.iterate();
}
catch (Throwable ex)
{
try
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus)
closeConnection(null, closeStatus, Callback.from(
()->callback.failed(ex),
x2->
{
ex.addSuppressed(x2);
callback.failed(ex);
}));
else
callback.failed(ex);
}
else
callback.failed(ex);
}
finally
{
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus)
closeConnection(null, closeStatus);
}
}
}
}
@ -609,7 +635,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
private class IncomingAdaptor implements IncomingFrames
{
@Override
public void onFrame(Frame frame, Callback callback)
public void onFrame(Frame frame, final Callback callback)
{
try
{
@ -619,48 +645,41 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
boolean closeConnection = channelState.onIncomingFrame(frame);
// Handle inbound close
if (frame.getOpCode() == OpCode.CLOSE)
// Handle inbound frame
if (frame.getOpCode() != OpCode.CLOSE)
{
connection.cancelDemand();
if (closeConnection)
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
handler.onClosed(channelState.getCloseStatus());
connection.close();
}
};
handler.onFrame(frame, callback);
return;
}
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
if (channelState.isOutputOpen())
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: sending close response {}", closeStatus);
// this may race with a rare application close but errors are ignored
if (closeStatus==null)
closeStatus = CloseStatus.NO_CODE_STATUS;
close(closeStatus.getCode(), closeStatus.getReason(), Callback.NOOP);
}
}
};
handler.onFrame(frame, callback);
return;
}
// Handle the frame
handler.onFrame(frame, callback);
// Handle inbound CLOSE
connection.cancelDemand();
Callback closeCallback ;
if (closeConnection)
{
closeCallback = Callback.from(()-> closeConnection(null, channelState.getCloseStatus(), callback));
}
else
{
closeCallback = Callback.from(()->
{
if (channelState.isOutputOpen())
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: sending close response {}", closeStatus);
close(closeStatus==null ? CloseStatus.NO_CODE_STATUS : closeStatus, callback);
}
else
{
callback.succeeded();
}
},
x->processHandlerError(x,callback));
}
handler.onFrame(frame, closeCallback);
}
catch (Throwable t)
{
@ -753,9 +772,29 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
static class AbnormalCloseStatus extends CloseStatus
{
public AbnormalCloseStatus(int statusCode, String reasonPhrase)
final Throwable cause;
public AbnormalCloseStatus(int statusCode, Throwable cause)
{
super(statusCode, reasonPhrase);
super(statusCode, cause.getMessage());
this.cause = cause;
}
public Throwable getCause()
{
return cause;
}
public static Throwable getCause(CloseStatus status)
{
if (status instanceof AbnormalCloseStatus)
return ((AbnormalCloseStatus)status).getCause();
return null;
}
@Override
public String toString()
{
return "Abnormal" + super.toString() + ":" + cause;
}
}

View File

@ -58,23 +58,23 @@ public class WebSocketChannelState
{
synchronized (this)
{
if (_channelState != State.CONNECTED)
throw new IllegalStateException(_channelState.toString());
switch(_channelState)
{
case CONNECTED:
_channelState = State.OPEN;
break;
_channelState = State.OPEN;
case OSHUT:
case CLOSED:
// Already closed in onOpen handler
break;
default:
throw new IllegalStateException(_channelState.toString());
}
}
}
@Override
public String toString()
{
return String.format("%s@%x{%s,i=%s,o=%s,c=%s}",getClass().getSimpleName(),hashCode(),
_channelState,
OpCode.name(_incomingContinuation),
OpCode.name(_outgoingContinuation),
_closeStatus);
}
public State getState()
{
@ -98,7 +98,7 @@ public class WebSocketChannelState
public boolean isOutputOpen()
{
State state = getState();
return (state==State.OPEN || state==State.ISHUT);
return (state==State.CONNECTED || state==State.OPEN || state==State.ISHUT);
}
public CloseStatus getCloseStatus()
@ -147,6 +147,7 @@ public class WebSocketChannelState
switch (_channelState)
{
case CONNECTED:
case OPEN:
_channelState = State.OSHUT;
return false;
@ -204,6 +205,16 @@ public class WebSocketChannelState
}
@Override
public String toString()
{
return String.format("%s@%x{%s,i=%s,o=%s,c=%s}",getClass().getSimpleName(),hashCode(),
_channelState,
OpCode.name(_incomingContinuation),
OpCode.name(_outgoingContinuation),
_closeStatus);
}
private static byte checkDataSequence(byte opcode, boolean fin, byte lastOpCode) throws ProtocolException
{
switch (opcode)

View File

@ -185,7 +185,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
LOG.debug("onIdleExpired()");
// treat as a handler error because socket is still open
channel.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout"));
channel.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout"),Callback.NOOP);
return true;
}
@ -201,7 +201,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
LOG.debug("onReadTimeout()");
// treat as a handler error because socket is still open
channel.processHandlerError(new WebSocketTimeoutException("Timeout on Read", timeout));
channel.processHandlerError(new WebSocketTimeoutException("Timeout on Read", timeout),Callback.NOOP);
return false;
}
@ -241,7 +241,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
referenced.release();
// notify session & endpoint
channel.processHandlerError(cause);
channel.processHandlerError(cause,NOOP);
}
});
}
@ -453,7 +453,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
LOG.warn(t.toString());
BufferUtil.clear(networkBuffer.getBuffer());
releaseNetworkBuffer();
channel.processConnectionError(t);
channel.processConnectionError(t,Callback.NOOP);
}
}
@ -494,8 +494,8 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
LOG.debug("onOpen() {}", this);
// Open Channel
channel.onOpen();
super.onOpen();
channel.onOpen();
}
@Override
@ -615,7 +615,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
public void onCompleteFailure(Throwable x)
{
super.onCompleteFailure(x);
channel.processConnectionError(x);
channel.processConnectionError(x,NOOP);
}
}
}

View File

@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.core;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import java.nio.ByteBuffer;
import static org.eclipse.jetty.websocket.core.OpCode.PONG;
/**
@ -35,7 +35,7 @@ import static org.eclipse.jetty.websocket.core.OpCode.PONG;
* NOTE: The introduction of WebSocket over HTTP/2 might change the behavior and implementation some.
* </p>
*/
public class AbstractTestFrameHandler implements FrameHandler
public class AbstractTestFrameHandler implements SynchronousFrameHandler
{
private Logger LOG = Log.getLogger(AbstractTestFrameHandler.class);
private byte partial = OpCode.UNDEFINED;

View File

@ -111,7 +111,7 @@ public class MessageHandlerTest
}
};
handler.onOpen(session);
handler.onOpen(session, NOOP);
}
@Test
@ -350,7 +350,7 @@ public class MessageHandlerTest
FutureCallback callback;
handler.setMaxTextMessageSize(4);
handler.onOpen(session);
handler.onOpen(session, NOOP);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, true, "Testing"), callback);
@ -369,7 +369,7 @@ public class MessageHandlerTest
FutureCallback callback;
handler.setMaxTextMessageSize(4);
handler.onOpen(session);
handler.onOpen(session, NOOP);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, false, "123"), callback);
@ -570,7 +570,7 @@ public class MessageHandlerTest
FutureCallback callback;
handler.setMaxBinaryMessageSize(4);
handler.onOpen(session);
handler.onOpen(session, NOOP);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.BINARY, true, "Testing"), callback);
@ -589,7 +589,7 @@ public class MessageHandlerTest
FutureCallback callback;
handler.setMaxBinaryMessageSize(4);
handler.onOpen(session);
handler.onOpen(session, NOOP);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.BINARY, false, "123"), callback);
@ -653,7 +653,7 @@ public class MessageHandlerTest
}
};
handler.onOpen(session);
handler.onOpen(session, NOOP);
FutureCallback callback;
@ -681,7 +681,7 @@ public class MessageHandlerTest
}
};
handler.onOpen(session);
handler.onOpen(session, NOOP);
FutureCallback callback;

View File

@ -0,0 +1,87 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.Callback;
public interface SynchronousFrameHandler extends FrameHandler
{
@Override
default void onOpen(CoreSession coreSession, Callback callback)
{
try
{
onOpen(coreSession);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onOpen(CoreSession coreSession) throws Exception {}
@Override
default void onFrame(Frame frame, Callback callback)
{
try
{
onFrame(frame);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onFrame(Frame frame) throws Exception {}
@Override
default void onClosed(CloseStatus closeStatus, Callback callback)
{
try
{
onClosed(closeStatus);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onClosed(CloseStatus closeStatus) throws Exception {}
@Override
default void onError(Throwable cause, Callback callback)
{
try
{
onError(cause);
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}
default void onError(Throwable cause) throws Exception {}
}

View File

@ -18,18 +18,18 @@
package org.eclipse.jetty.websocket.core;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
public class TestFrameHandler implements FrameHandler
public class TestFrameHandler implements SynchronousFrameHandler
{
private static Logger LOG = Log.getLogger(TestFrameHandler.class);
private static Logger LOG = Log.getLogger(SynchronousFrameHandler.class);
private CoreSession session;
@ -47,7 +47,7 @@ public class TestFrameHandler implements FrameHandler
}
@Override
public void onOpen(CoreSession coreSession) throws Exception
public void onOpen(CoreSession coreSession)
{
LOG.info("onOpen {}", coreSession);
this.session = coreSession;
@ -69,7 +69,7 @@ public class TestFrameHandler implements FrameHandler
}
@Override
public void onError(Throwable cause) throws Exception
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null?null:cause.toString());
}

View File

@ -88,7 +88,13 @@ public class WebSocketCloseTest extends WebSocketTester
assertTrue(server.handler.opened.await(10, TimeUnit.SECONDS));
assertThat(server.handler.getCoreSession().toString(), containsString("OPEN"));
assertThat(server.handler.state, containsString("CONNECTED"));
while(true)
{
Thread.yield();
if (server.handler.getCoreSession().toString().contains("OPEN"))
break;
}
LOG.info("Server: OPEN");
break;
@ -102,6 +108,12 @@ public class WebSocketCloseTest extends WebSocketTester
client = newClient(server.getLocalPort());
assertTrue(server.handler.opened.await(10, TimeUnit.SECONDS));
while(true)
{
Thread.yield();
if (server.handler.getCoreSession().toString().contains("OPEN"))
break;
}
server.handler.getCoreSession().demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
@ -123,6 +135,12 @@ public class WebSocketCloseTest extends WebSocketTester
client = newClient(server.getLocalPort());
assertTrue(server.handler.opened.await(10, TimeUnit.SECONDS));
while(true)
{
Thread.yield();
if (server.handler.getCoreSession().toString().contains("OPEN"))
break;
}
server.sendFrame(CloseStatus.toFrame(CloseStatus.NORMAL));
Frame frame = receiveFrame(client.getInputStream());
@ -138,7 +156,7 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void serverClose_ICLOSED() throws Exception
public void serverClose_ISHUT() throws Exception
{
setup(State.ISHUT);
@ -152,7 +170,7 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void serverDifferentClose_ICLOSED() throws Exception
public void serverDifferentClose_ISHUT() throws Exception
{
setup(State.ISHUT);
@ -167,24 +185,21 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void serverFailClose_ICLOSED() throws Exception
public void serverFailClose_ISHUT() throws Exception
{
try (StacklessLogging stackless = new StacklessLogging(WebSocketChannel.class))
{
setup(State.ISHUT);
server.handler.receivedCallback.poll().failed(new Exception("test failure"));
setup(State.ISHUT);
server.handler.receivedCallback.poll().failed(new Exception("test failure"));
Frame frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.SERVER_ERROR));
Frame frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.SERVER_ERROR));
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
}
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
}
@Test
public void clientClose_OCLOSED() throws Exception
public void clientClose_OSHUT() throws Exception
{
setup(State.OSHUT);
server.handler.getCoreSession().demand(1);
@ -199,7 +214,7 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void clientDifferentClose_OCLOSED() throws Exception
public void clientDifferentClose_OSHUT() throws Exception
{
setup(State.OSHUT);
server.handler.getCoreSession().demand(1);
@ -214,7 +229,7 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void clientCloseServerFailClose_OCLOSED() throws Exception
public void clientCloseServerFailClose_OSHUT() throws Exception
{
try (StacklessLogging stackless = new StacklessLogging(WebSocketChannel.class))
{
@ -244,7 +259,7 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void clientSendsBadFrame_OCLOSED() throws Exception
public void clientSendsBadFrame_OSHUT() throws Exception
{
setup(State.OSHUT);
@ -256,14 +271,12 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void clientSendsBadFrame_ICLOSED() throws Exception
public void clientSendsBadFrame_ISHUT() throws Exception
{
setup(State.ISHUT);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.handler.getCoreSession().demand(1);
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.close();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
@ -284,7 +297,7 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void clientAborts_OCLOSED() throws Exception
public void clientAborts_OSHUT() throws Exception
{
setup(State.OSHUT);
@ -297,14 +310,12 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void clientAborts_ICLOSED() throws Exception
public void clientAborts_ISHUT() throws Exception
{
setup(State.ISHUT);
client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.handler.getCoreSession().demand(1);
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.close();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
@ -328,7 +339,7 @@ public class WebSocketCloseTest extends WebSocketTester
}
@Test
public void onFrameThrows_OCLOSED() throws Exception
public void onFrameThrows_OSHUT() throws Exception
{
setup(State.OSHUT);
@ -344,9 +355,10 @@ public class WebSocketCloseTest extends WebSocketTester
assertThat(server.handler.closeStatus.getReason(), containsString("onReceiveFrame throws for binary frames"));
}
static class TestFrameHandler implements FrameHandler
static class TestFrameHandler implements SynchronousFrameHandler
{
private CoreSession session;
String state;
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
protected BlockingQueue<Callback> receivedCallback = new BlockingArrayQueue<>();
@ -368,7 +380,8 @@ public class WebSocketCloseTest extends WebSocketTester
public void onOpen(CoreSession coreSession)
{
LOG.info("onOpen {}", coreSession);
this.session = coreSession;
session = coreSession;
state = session.toString();
opened.countDown();
}
@ -376,6 +389,7 @@ public class WebSocketCloseTest extends WebSocketTester
public void onFrame(Frame frame, Callback callback)
{
LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload()));
state = session.toString();
receivedCallback.offer(callback);
receivedFrames.offer(Frame.copy(frame));
@ -387,14 +401,16 @@ public class WebSocketCloseTest extends WebSocketTester
public void onClosed(CloseStatus closeStatus)
{
LOG.info("onClosed {}", closeStatus);
state = session.toString();
this.closeStatus = closeStatus;
closed.countDown();
}
@Override
public void onError(Throwable cause) throws Exception
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null?null:cause.toString());
state = session.toString();
}
@Override
@ -410,6 +426,7 @@ public class WebSocketCloseTest extends WebSocketTester
frame.setPayload(text);
getCoreSession().sendFrame(frame, NOOP, false);
state = session.toString();
}
}

View File

@ -0,0 +1,371 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests of a core server with a fake client
*/
public class WebSocketOpenTest extends WebSocketTester
{
private static Logger LOG = Log.getLogger(WebSocketOpenTest.class);
private WebSocketServer server;
private TestFrameHandler serverHandler;
private Socket client;
@AfterEach
public void after() throws Exception
{
if (server != null)
server.stop();
}
public void setup(BiFunction<FrameHandler.CoreSession,Callback,Void> onOpen) throws Exception
{
serverHandler = new TestFrameHandler(onOpen);
server = new WebSocketServer(0, serverHandler);
server.start();
client = newClient(server.getLocalPort());
}
@Test
public void testSendFrameInOnOpen() throws Exception
{
setup((s,c)->
{
assertThat(s.toString(),containsString("CONNECTED"));
WebSocketOpenTest.TestFrameHandler.sendText(s,"Hello", c);
s.demand(1);
return null;
});
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getPayloadAsUTF8(),is("Hello"));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(),is(CloseStatus.NORMAL));
frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(),is(OpCode.CLOSE));
assertThat(new CloseStatus(frame).getCode(),is(CloseStatus.NORMAL));
}
@Test
public void testFailureInOnOpen() throws Exception
{
try(StacklessLogging stackless = new StacklessLogging(WebSocketChannel.class))
{
setup((s, c) ->
{
assertThat(s.toString(), containsString("CONNECTED"));
c.failed(new Exception("Test Exception in onOpen"));
return null;
});
assertTrue(server.handler.onError.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.error, notNullValue());
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(), is(OpCode.CLOSE));
assertThat(new CloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
}
}
@Test
public void testCloseInOnOpen() throws Exception
{
setup((s,c)->
{
assertThat(s.toString(),containsString("CONNECTED"));
s.close(CloseStatus.SHUTDOWN,"Test close in onOpen", c);
s.demand(1);
return null;
});
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(),is(OpCode.CLOSE));
assertThat(new CloseStatus(frame).getCode(),is(CloseStatus.SHUTDOWN));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(),is(CloseStatus.NORMAL));
}
@Test
public void testAsyncOnOpen() throws Exception
{
Exchanger<FrameHandler.CoreSession> sx = new Exchanger<>();
Exchanger<Callback> cx = new Exchanger<>();
setup((s,c)->
{
assertThat(s.toString(),containsString("CONNECTED"));
try
{
sx.exchange(s);
cx.exchange(c);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
return null;
});
FrameHandler.CoreSession session = sx.exchange(null);
Callback onOpenCallback = cx.exchange(null);
Thread.sleep(100);
// Can send while onOpen is active
WebSocketOpenTest.TestFrameHandler.sendText(session,"Hello", NOOP);
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getPayloadAsUTF8(),is("Hello"));
// But cannot receive
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertFalse(server.handler.onClosed.await(1, TimeUnit.SECONDS));
// Can't demand until open
assertThrows(Throwable.class, () -> session.demand(1));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertFalse(server.handler.onClosed.await(1, TimeUnit.SECONDS));
// Succeeded moves to OPEN state and still does not read CLOSE frame
onOpenCallback.succeeded();
assertThat(session.toString(),containsString("OPEN"));
// Demand start receiving frames
session.demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
// Closed handled normally
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(),is(CloseStatus.NORMAL));
frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(),is(OpCode.CLOSE));
assertThat(new CloseStatus(frame).getCode(),is(CloseStatus.NORMAL));
}
static class TestFrameHandler implements SynchronousFrameHandler
{
private CoreSession session;
private BiFunction<CoreSession,Callback,Void> onOpen;
private CloseStatus closeStatus;
private CountDownLatch onClosed = new CountDownLatch(1);
private Throwable error;
private CountDownLatch onError = new CountDownLatch(1);
private Frame frame;
private CountDownLatch onFrame = new CountDownLatch(1);
public CoreSession getCoreSession()
{
synchronized (this)
{
return session;
}
}
TestFrameHandler(BiFunction<CoreSession,Callback,Void> onOpen)
{
this.onOpen = onOpen;
}
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
LOG.info("onOpen {}", coreSession);
synchronized (this)
{
session = coreSession;
}
onOpen.apply(coreSession, callback);
}
@Override
public void onFrame(Frame frame, Callback callback)
{
LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload()));
callback.succeeded();
if (onFrame.getCount()==1)
{
this.frame = frame;
onFrame.countDown();
}
}
@Override
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null?null:cause.toString());
if (onError.getCount()!=1)
throw new IllegalStateException();
error = cause;
onError.countDown();
}
@Override
public void onClosed(CloseStatus closeStatus)
{
LOG.info("onClosed {}", closeStatus);
if (onClosed.getCount()!=1)
throw new IllegalStateException();
this.closeStatus = closeStatus;
onClosed.countDown();
}
@Override
public boolean isDemanding()
{
return true;
}
public void sendText(String text)
{
sendText(session, text);
}
public void sendText(String text, Callback callback)
{
sendText(session, text, callback);
}
static void sendText(FrameHandler.CoreSession session, String text)
{
sendText(session, text, NOOP);
}
static void sendText(FrameHandler.CoreSession session, String text, Callback callback)
{
Frame frame = new Frame(OpCode.TEXT);
frame.setFin(true);
frame.setPayload(text);
session.sendFrame(frame, callback, false);
}
}
static class WebSocketServer extends AbstractLifeCycle
{
private static Logger LOG = Log.getLogger(WebSocketServer.class);
private final Server server;
private final TestFrameHandler handler;
public void doStart() throws Exception
{
server.start();
}
public void doStop() throws Exception
{
server.stop();
}
public int getLocalPort()
{
return server.getBean(NetworkConnector.class).getLocalPort();
}
public WebSocketServer(int port, TestFrameHandler frameHandler)
{
this.handler = frameHandler;
server = new Server();
server.getBean(QueuedThreadPool.class).setName("WSCoreServer");
ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory());
connector.addBean(new RFC6455Handshaker());
connector.setPort(port);
connector.setIdleTimeout(1000000);
server.addConnector(connector);
ContextHandler context = new ContextHandler("/");
server.setHandler(context);
WebSocketNegotiator negotiator = new TestWebSocketNegotiator(new DecoratedObjectFactory(), new WebSocketExtensionRegistry(),
connector.getByteBufferPool(), frameHandler);
WebSocketUpgradeHandler upgradeHandler = new TestWebSocketUpgradeHandler(negotiator);
context.setHandler(upgradeHandler);
}
public void sendFrame(Frame frame)
{
handler.getCoreSession().sendFrame(frame, NOOP, false);
}
public void sendText(String text)
{
LOG.info("sending {}...", text);
WebSocketOpenTest.TestFrameHandler.sendText(handler.session, text);
}
public void close()
{
handler.getCoreSession().close(CloseStatus.NORMAL, "WebSocketServer Initiated Close", Callback.NOOP);
}
public boolean isOpen()
{
return handler.getCoreSession().isOutputOpen();
}
}
}

View File

@ -18,6 +18,15 @@
package org.eclipse.jetty.websocket.core.chat;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
@ -34,14 +43,6 @@ import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.eclipse.jetty.util.Callback.NOOP;
public class ChatWebSocketServer
@ -67,14 +68,12 @@ public class ChatWebSocketServer
// + MUST return the FrameHandler or null or exception?
return new MessageHandler()
{
@Override
public void onOpen(CoreSession coreSession) throws Exception
public void onOpen(CoreSession coreSession, Callback callback)
{
LOG.debug("onOpen {}", coreSession);
setMaxTextMessageSize(2 * 1024);
super.onOpen(coreSession);
members.add(this);
super.onOpen(coreSession, Callback.from(()->{members.add(this); callback.succeeded();},x->callback.failed(x)));
}
@Override
@ -92,10 +91,10 @@ public class ChatWebSocketServer
}
@Override
public void onClosed(CloseStatus closeStatus)
public void onClosed(CloseStatus closeStatus, Callback callback)
{
LOG.debug("onClosed {}", closeStatus);
super.onClosed(closeStatus);
super.onClosed(closeStatus, Callback.from(()->members.remove(this),callback));
members.remove(this);
}
};

View File

@ -113,6 +113,7 @@ public class WebSocketClientServerTest
{
LOG.info("channel aborted");
getCoreSession().abort();
callback.failed(new Exception());
}
else
{

View File

@ -162,9 +162,10 @@ public class WebSocketServerTest extends WebSocketTester
TestFrameHandler serverHandler = new TestFrameHandler()
{
@Override
public void onOpen(CoreSession coreSession) throws Exception
public void onOpen(CoreSession coreSession, Callback callback)
{
super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(1);
}
@ -291,9 +292,10 @@ public class WebSocketServerTest extends WebSocketTester
TestFrameHandler serverHandler = new TestFrameHandler()
{
@Override
public void onOpen(CoreSession coreSession) throws Exception
public void onOpen(CoreSession coreSession, Callback callback)
{
super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(3);
}
@ -349,9 +351,10 @@ public class WebSocketServerTest extends WebSocketTester
TestFrameHandler serverHandler = new TestFrameHandler()
{
@Override
public void onOpen(CoreSession coreSession) throws Exception
public void onOpen(CoreSession coreSession, Callback callback)
{
super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(2);
}
@ -423,9 +426,10 @@ public class WebSocketServerTest extends WebSocketTester
TestFrameHandler serverHandler = new TestFrameHandler()
{
@Override
public void onOpen(CoreSession coreSession) throws Exception
public void onOpen(CoreSession coreSession, Callback callback)
{
super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(2);
}