Merge pull request #3262 from eclipse/jetty-10.0.x-3167-2175-websocket-close

Jetty 10.0.x 3167 2175 websocket close clean up
This commit is contained in:
Greg Wilkins 2019-01-23 10:32:16 +11:00 committed by GitHub
commit 6fc1cb5ca6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 756 additions and 725 deletions

View File

@ -27,7 +27,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList; import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer;
/** /**
* Example of setting up a javax.websocket server with Jetty embedded * Example of setting up a javax.websocket server with Jetty embedded
@ -59,7 +59,7 @@ public class WebSocketJsrServer
handlers.addHandler(context); handlers.addHandler(context);
// Enable javax.websocket configuration for the context // Enable javax.websocket configuration for the context
ServerContainer wsContainer = JavaxWebSocketServerContainerInitializer ServerContainer wsContainer = JavaxWebSocketServletContainerInitializer
.configureContext(context); .configureContext(context);
// Add your websockets to the container // Add your websockets to the container

View File

@ -128,7 +128,18 @@ public interface Callback extends Invocable
} }
}; };
} }
static Callback from(Runnable completed)
{
return new Completing()
{
public void completed()
{
completed.run();
}
};
}
class Completing implements Callback class Completing implements Callback
{ {
@Override @Override

View File

@ -18,24 +18,6 @@
package org.eclipse.jetty.websocket.javax.common; package org.eclipse.jetty.websocket.javax.common;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.javax.common.encoders.AvailableEncoders;
import org.eclipse.jetty.websocket.javax.common.util.ReflectUtils;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.Extension;
import javax.websocket.MessageHandler;
import javax.websocket.RemoteEndpoint.Async;
import javax.websocket.RemoteEndpoint.Basic;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.security.Principal; import java.security.Principal;
@ -48,6 +30,25 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.Extension;
import javax.websocket.MessageHandler;
import javax.websocket.RemoteEndpoint.Async;
import javax.websocket.RemoteEndpoint.Basic;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.javax.common.encoders.AvailableEncoders;
import org.eclipse.jetty.websocket.javax.common.util.ReflectUtils;
/** /**
* Client Session for the JSR. * Client Session for the JSR.
*/ */
@ -535,7 +536,7 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
@Override @Override
public boolean isOpen() public boolean isOpen()
{ {
return coreSession.isOpen(); return coreSession.isOutputOpen();
} }
/** /**

View File

@ -22,7 +22,7 @@ import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.webapp.Configuration; import org.eclipse.jetty.webapp.Configuration;
import org.eclipse.jetty.websocket.javax.server.ContainerDefaultConfigurator; import org.eclipse.jetty.websocket.javax.server.ContainerDefaultConfigurator;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketConfiguration; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketConfiguration;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer;
module org.eclipse.jetty.websocket.javax.server module org.eclipse.jetty.websocket.javax.server
{ {
@ -42,7 +42,7 @@ module org.eclipse.jetty.websocket.javax.server
requires org.eclipse.jetty.websocket.javax.client; requires org.eclipse.jetty.websocket.javax.client;
requires org.eclipse.jetty.websocket.servlet; requires org.eclipse.jetty.websocket.servlet;
provides ServletContainerInitializer with JavaxWebSocketServerContainerInitializer; provides ServletContainerInitializer with JavaxWebSocketServletContainerInitializer;
provides ServerEndpointConfig.Configurator with ContainerDefaultConfigurator; provides ServerEndpointConfig.Configurator with ContainerDefaultConfigurator;
provides Configuration with JavaxWebSocketConfiguration; provides Configuration with JavaxWebSocketConfiguration;
} }

View File

@ -89,10 +89,10 @@ public class JavaxWebSocketServerContainer
if (container==null) if (container==null)
{ {
// Find Pre-Existing (Shared?) HttpClient and/or executor // Find Pre-Existing (Shared?) HttpClient and/or executor
HttpClient httpClient = (HttpClient)servletContext.getAttribute(JavaxWebSocketServerContainerInitializer.HTTPCLIENT_ATTRIBUTE); HttpClient httpClient = (HttpClient)servletContext.getAttribute(JavaxWebSocketServletContainerInitializer.HTTPCLIENT_ATTRIBUTE);
if (httpClient == null) if (httpClient == null)
httpClient = (HttpClient)contextHandler.getServer() httpClient = (HttpClient)contextHandler.getServer()
.getAttribute(JavaxWebSocketServerContainerInitializer.HTTPCLIENT_ATTRIBUTE); .getAttribute(JavaxWebSocketServletContainerInitializer.HTTPCLIENT_ATTRIBUTE);
Executor executor = httpClient == null?null:httpClient.getExecutor(); Executor executor = httpClient == null?null:httpClient.getExecutor();
if (executor == null) if (executor == null)
@ -124,7 +124,7 @@ public class JavaxWebSocketServerContainer
private List<ServerEndpointConfig> deferredEndpointConfigs; private List<ServerEndpointConfig> deferredEndpointConfigs;
/** /**
* Main entry point for {@link JavaxWebSocketServerContainerInitializer}. * Main entry point for {@link JavaxWebSocketServletContainerInitializer}.
* @param webSocketMapping the {@link WebSocketMapping} that this container belongs to * @param webSocketMapping the {@link WebSocketMapping} that this container belongs to
* @param httpClient the {@link HttpClient} instance to use * @param httpClient the {@link HttpClient} instance to use
*/ */

View File

@ -41,12 +41,12 @@ import org.eclipse.jetty.websocket.servlet.WebSocketMapping;
import org.eclipse.jetty.websocket.servlet.WebSocketUpgradeFilter; import org.eclipse.jetty.websocket.servlet.WebSocketUpgradeFilter;
@HandlesTypes({ ServerApplicationConfig.class, ServerEndpoint.class, Endpoint.class }) @HandlesTypes({ ServerApplicationConfig.class, ServerEndpoint.class, Endpoint.class })
public class JavaxWebSocketServerContainerInitializer implements ServletContainerInitializer public class JavaxWebSocketServletContainerInitializer implements ServletContainerInitializer
{ {
public static final String ENABLE_KEY = "org.eclipse.jetty.websocket.javax"; public static final String ENABLE_KEY = "org.eclipse.jetty.websocket.javax";
public static final String DEPRECATED_ENABLE_KEY = "org.eclipse.jetty.websocket.jsr356"; public static final String DEPRECATED_ENABLE_KEY = "org.eclipse.jetty.websocket.jsr356";
public static final String HTTPCLIENT_ATTRIBUTE = "org.eclipse.jetty.websocket.javax.HttpClient"; public static final String HTTPCLIENT_ATTRIBUTE = "org.eclipse.jetty.websocket.javax.HttpClient";
private static final Logger LOG = Log.getLogger(JavaxWebSocketServerContainerInitializer.class); private static final Logger LOG = Log.getLogger(JavaxWebSocketServletContainerInitializer.class);
/** /**
* Test a ServletContext for {@code init-param} or {@code attribute} at {@code keyName} for * Test a ServletContext for {@code init-param} or {@code attribute} at {@code keyName} for

View File

@ -1 +1 @@
org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer

View File

@ -18,6 +18,15 @@
package examples; package examples;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Objects;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -29,15 +38,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainer; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainer;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Objects;
/** /**
* Tool to help debug JSR based websocket circumstances reported around browsers. * Tool to help debug JSR based websocket circumstances reported around browsers.
@ -106,7 +107,7 @@ public class JsrBrowserDebugTool
holder.setInitParameter("dirAllowed", "true"); holder.setInitParameter("dirAllowed", "true");
server.setHandler(context); server.setHandler(context);
JavaxWebSocketServerContainer container = JavaxWebSocketServerContainerInitializer.configureContext(context); JavaxWebSocketServerContainer container = JavaxWebSocketServletContainerInitializer.configureContext(context);
container.addEndpoint(JsrBrowserSocket.class); container.addEndpoint(JsrBrowserSocket.class);
LOG.info("{} setup on port {}", this.getClass().getName(), port); LOG.info("{} setup on port {}", this.getClass().getName(), port);

View File

@ -18,6 +18,16 @@
package org.eclipse.jetty.websocket.javax.tests; package org.eclipse.jetty.websocket.javax.tests;
import java.net.URI;
import java.util.Map;
import java.util.function.BiConsumer;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.websocket.OnMessage;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.pathmap.PathSpec; import org.eclipse.jetty.http.pathmap.PathSpec;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
@ -38,23 +48,12 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.internal.Parser; import org.eclipse.jetty.websocket.core.internal.Parser;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator; import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketMapping;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.websocket.OnMessage;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import java.net.URI;
import java.util.Map;
import java.util.function.BiConsumer;
public class LocalServer extends ContainerLifeCycle implements LocalFuzzer.Provider public class LocalServer extends ContainerLifeCycle implements LocalFuzzer.Provider
{ {
@ -172,7 +171,7 @@ public class LocalServer extends ContainerLifeCycle implements LocalFuzzer.Provi
{ {
servletContextHandler = new ServletContextHandler(server, "/", true, false); servletContextHandler = new ServletContextHandler(server, "/", true, false);
servletContextHandler.setContextPath("/"); servletContextHandler.setContextPath("/");
serverContainer = JavaxWebSocketServerContainerInitializer.configureContext(servletContextHandler); serverContainer = JavaxWebSocketServletContainerInitializer.configureContext(servletContextHandler);
configureServletContextHandler(servletContextHandler); configureServletContextHandler(servletContextHandler);
return servletContextHandler; return servletContextHandler;
} }

View File

@ -18,6 +18,13 @@
package org.eclipse.jetty.websocket.javax.tests.client.misbehaving; package org.eclipse.jetty.websocket.javax.tests.client.misbehaving;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel; import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.javax.tests.CoreServer; import org.eclipse.jetty.websocket.javax.tests.CoreServer;
@ -25,12 +32,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;

View File

@ -18,24 +18,25 @@
package org.eclipse.jetty.websocket.javax.tests.quotes; package org.eclipse.jetty.websocket.javax.tests.quotes;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.websocket.OnMessage;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.javax.tests.Fuzzer; import org.eclipse.jetty.websocket.javax.tests.Fuzzer;
import org.eclipse.jetty.websocket.javax.tests.LocalServer; import org.eclipse.jetty.websocket.javax.tests.LocalServer;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.OnMessage;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -73,7 +74,7 @@ public class QuotesDecoderTextStreamTest
@Override @Override
protected void configureServletContextHandler(ServletContextHandler context) throws Exception protected void configureServletContextHandler(ServletContextHandler context) throws Exception
{ {
ServerContainer container = JavaxWebSocketServerContainerInitializer.configureContext(context); ServerContainer container = JavaxWebSocketServletContainerInitializer.configureContext(context);
container.addEndpoint(QuotesEchoStringSocket.class); container.addEndpoint(QuotesEchoStringSocket.class);
} }
}; };

View File

@ -18,15 +18,12 @@
package org.eclipse.jetty.websocket.javax.tests.server; package org.eclipse.jetty.websocket.javax.tests.server;
import org.eclipse.jetty.server.Server; import java.lang.management.ManagementFactory;
import org.eclipse.jetty.server.ServerConnector; import java.lang.management.MemoryMXBean;
import org.eclipse.jetty.servlet.ServletContextHandler; import java.lang.management.MemoryUsage;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer; import java.net.URI;
import org.junit.jupiter.api.AfterEach; import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.BeforeEach; import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import javax.websocket.ContainerProvider; import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint; import javax.websocket.Endpoint;
@ -36,12 +33,16 @@ import javax.websocket.Session;
import javax.websocket.WebSocketContainer; import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerContainer; import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpointConfig; import javax.websocket.server.ServerEndpointConfig;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean; import org.eclipse.jetty.server.Server;
import java.lang.management.MemoryUsage; import org.eclipse.jetty.server.ServerConnector;
import java.net.URI; import org.eclipse.jetty.servlet.ServletContextHandler;
import java.util.concurrent.CountDownLatch; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer;
import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
@ -79,7 +80,7 @@ public class MemoryUsageTest
server.addConnector(connector); server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/", true, false); ServletContextHandler context = new ServletContextHandler(server, "/", true, false);
ServerContainer container = JavaxWebSocketServerContainerInitializer.configureContext(context); ServerContainer container = JavaxWebSocketServletContainerInitializer.configureContext(context);
ServerEndpointConfig config = ServerEndpointConfig.Builder.create(BasicEndpoint.class, "/").build(); ServerEndpointConfig config = ServerEndpointConfig.Builder.create(BasicEndpoint.class, "/").build();
container.addEndpoint(config); container.addEndpoint(config);

View File

@ -18,18 +18,9 @@
package org.eclipse.jetty.websocket.javax.tests.server; package org.eclipse.jetty.websocket.javax.tests.server;
import org.eclipse.jetty.servlet.ServletContextHandler; import java.io.IOException;
import org.eclipse.jetty.util.log.Log; import java.util.ArrayList;
import org.eclipse.jetty.util.log.Logger; import java.util.List;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer;
import org.eclipse.jetty.websocket.javax.tests.Fuzzer;
import org.eclipse.jetty.websocket.javax.tests.LocalServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import javax.websocket.OnError; import javax.websocket.OnError;
import javax.websocket.OnMessage; import javax.websocket.OnMessage;
@ -37,9 +28,19 @@ import javax.websocket.OnOpen;
import javax.websocket.Session; import javax.websocket.Session;
import javax.websocket.server.ServerContainer; import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList; import org.eclipse.jetty.servlet.ServletContextHandler;
import java.util.List; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.javax.tests.Fuzzer;
import org.eclipse.jetty.websocket.javax.tests.LocalServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/** /**
* Sends raw TEXT or BINARY messages to server. * Sends raw TEXT or BINARY messages to server.
@ -115,7 +116,7 @@ public class PartialEchoTest
@Override @Override
protected void configureServletContextHandler(ServletContextHandler context) throws Exception protected void configureServletContextHandler(ServletContextHandler context) throws Exception
{ {
ServerContainer container = JavaxWebSocketServerContainerInitializer.configureContext(context); ServerContainer container = JavaxWebSocketServletContainerInitializer.configureContext(context);
container.addEndpoint(PartialTextSocket.class); container.addEndpoint(PartialTextSocket.class);
container.addEndpoint(PartialTextSessionSocket.class); container.addEndpoint(PartialTextSessionSocket.class);
} }

View File

@ -18,15 +18,14 @@
package org.eclipse.jetty.websocket.javax.tests.server; package org.eclipse.jetty.websocket.javax.tests.server;
import org.eclipse.jetty.client.HttpClient; import java.io.IOException;
import org.eclipse.jetty.server.Server; import java.io.InputStream;
import org.eclipse.jetty.servlet.ServletContextHandler; import java.io.InputStreamReader;
import org.eclipse.jetty.util.IO; import java.io.StringWriter;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import java.net.HttpURLConnection;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainer; import java.net.URI;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer; import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.websocket.javax.tests.WSURI; import java.util.concurrent.Executor;
import org.junit.jupiter.api.Test;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
@ -40,16 +39,18 @@ import javax.websocket.OnMessage;
import javax.websocket.Session; import javax.websocket.Session;
import javax.websocket.WebSocketContainer; import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringWriter;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executor;
import static org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainerInitializer.HTTPCLIENT_ATTRIBUTE; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainer;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.javax.tests.WSURI;
import org.junit.jupiter.api.Test;
import static org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer.HTTPCLIENT_ATTRIBUTE;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -159,7 +160,7 @@ public class WebSocketServerContainerExecutorTest
// Using JSR356 Server Techniques to connectToServer() // Using JSR356 Server Techniques to connectToServer()
contextHandler.addServlet(ServerConnectServlet.class, "/connect"); contextHandler.addServlet(ServerConnectServlet.class, "/connect");
javax.websocket.server.ServerContainer container = JavaxWebSocketServerContainerInitializer.configureContext(contextHandler); javax.websocket.server.ServerContainer container = JavaxWebSocketServletContainerInitializer.configureContext(contextHandler);
container.addEndpoint(EchoSocket.class); container.addEndpoint(EchoSocket.class);
try try
{ {
@ -188,7 +189,7 @@ public class WebSocketServerContainerExecutorTest
// Using JSR356 Server Techniques to connectToServer() // Using JSR356 Server Techniques to connectToServer()
contextHandler.addServlet(ServerConnectServlet.class, "/connect"); contextHandler.addServlet(ServerConnectServlet.class, "/connect");
javax.websocket.server.ServerContainer container = JavaxWebSocketServerContainerInitializer.configureContext(contextHandler); javax.websocket.server.ServerContainer container = JavaxWebSocketServletContainerInitializer.configureContext(contextHandler);
container.addEndpoint(EchoSocket.class); container.addEndpoint(EchoSocket.class);
try try
{ {
@ -218,7 +219,7 @@ public class WebSocketServerContainerExecutorTest
// Using JSR356 Server Techniques to connectToServer() // Using JSR356 Server Techniques to connectToServer()
contextHandler.addServlet(ServerConnectServlet.class, "/connect"); contextHandler.addServlet(ServerConnectServlet.class, "/connect");
javax.websocket.server.ServerContainer container = JavaxWebSocketServerContainerInitializer.configureContext(contextHandler); javax.websocket.server.ServerContainer container = JavaxWebSocketServletContainerInitializer.configureContext(contextHandler);
container.addEndpoint(EchoSocket.class); container.addEndpoint(EchoSocket.class);
try try
{ {

View File

@ -18,6 +18,18 @@
package org.eclipse.jetty.websocket.client; package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.net.CookieStore;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory; import org.eclipse.jetty.util.DecoratedObjectFactory;
@ -34,18 +46,6 @@ import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry; import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import java.io.IOException;
import java.net.CookieStore;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy
{ {
private final WebSocketCoreClient coreClient; private final WebSocketCoreClient coreClient;
@ -93,9 +93,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IOException public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IOException
{ {
ClientUpgradeRequestImpl upgradeRequest = new ClientUpgradeRequestImpl(this, coreClient, null, toUri, websocket); return connect(websocket, toUri, null);
coreClient.connect(upgradeRequest);
return upgradeRequest.getFutureSession();
} }
/** /**
@ -107,7 +105,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
* @return the future for the session, available on success of connect * @return the future for the session, available on success of connect
* @throws IOException if unable to connect * @throws IOException if unable to connect
*/ */
public CompletableFuture<Session> connect(Object websocket, URI toUri, org.eclipse.jetty.websocket.api.UpgradeRequest request) throws IOException public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request) throws IOException
{ {
ClientUpgradeRequestImpl upgradeRequest = new ClientUpgradeRequestImpl(this, coreClient, request, toUri, websocket); ClientUpgradeRequestImpl upgradeRequest = new ClientUpgradeRequestImpl(this, coreClient, request, toUri, websocket);
coreClient.connect(upgradeRequest); coreClient.connect(upgradeRequest);

View File

@ -18,6 +18,11 @@
package org.eclipse.jetty.websocket.common; package org.eclipse.jetty.websocket.common;
import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Objects;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.websocket.api.CloseStatus; import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
@ -27,11 +32,6 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.FrameHandler;
import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Objects;
public class WebSocketSessionImpl implements Session, Dumpable public class WebSocketSessionImpl implements Session, Dumpable
{ {
private final FrameHandler.CoreSession coreSession; private final FrameHandler.CoreSession coreSession;
@ -160,7 +160,7 @@ public class WebSocketSessionImpl implements Session, Dumpable
@Override @Override
public boolean isOpen() public boolean isOpen()
{ {
return remoteEndpoint.getCoreSession().isOpen(); return remoteEndpoint.getCoreSession().isOutputOpen();
} }
@Override @Override

View File

@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
/** /**
* Representation of a WebSocket Close (status code &amp; reason) * Representation of a WebSocket Close (status code &amp; reason)
*/ */
@ -162,6 +162,15 @@ public class CloseStatus
return; return;
} }
public static CloseStatus getCloseStatus(Frame frame)
{
if (frame instanceof CloseStatus.Supplier)
return ((CloseStatus.Supplier)frame).getCloseStatus();
if (frame.getOpCode()==OpCode.CLOSE)
return new CloseStatus(frame);
return null;
}
public int getCode() public int getCode()
{ {
return code; return code;
@ -184,7 +193,7 @@ public class CloseStatus
int len = 2; // status code int len = 2; // status code
byte reasonBytes[] = null; byte[] reasonBytes = null;
if (reason != null) if (reason != null)
{ {
@ -198,7 +207,7 @@ public class CloseStatus
ByteBuffer buf = BufferUtil.allocate(len); ByteBuffer buf = BufferUtil.allocate(len);
BufferUtil.flipToFill(buf); BufferUtil.flipToFill(buf);
buf.put((byte)((statusCode >>> 8) & 0xFF)); buf.put((byte)((statusCode >>> 8) & 0xFF));
buf.put((byte)((statusCode >>> 0) & 0xFF)); buf.put((byte)(statusCode & 0xFF));
if ((reasonBytes != null) && (reasonBytes.length > 0)) if ((reasonBytes != null) && (reasonBytes.length > 0))
{ {
@ -265,19 +274,19 @@ public class CloseStatus
public Frame toFrame() public Frame toFrame()
{ {
return toFrame(code, reason); if (isTransmittableStatusCode(code))
return new CloseFrame(this, OpCode.CLOSE, true, asPayloadBuffer(code, reason));
return new CloseFrame(this, OpCode.CLOSE);
} }
public static Frame toFrame(int closeStatus) public static Frame toFrame(int closeStatus)
{ {
return toFrame(closeStatus, null); return new CloseStatus(closeStatus).toFrame();
} }
public static Frame toFrame(int closeStatus, String reason) public static Frame toFrame(int closeStatus, String reason)
{ {
if (isTransmittableStatusCode(closeStatus)) return new CloseStatus(closeStatus, reason).toFrame();
return new Frame(OpCode.CLOSE, true, asPayloadBuffer(closeStatus, reason));
return new Frame(OpCode.CLOSE);
} }
public static String codeString(int closeStatus) public static String codeString(int closeStatus)
@ -324,4 +333,27 @@ public class CloseStatus
return String.format("{%04d=%s,%s}", code, codeString(code), reason); return String.format("{%04d=%s,%s}", code, codeString(code), reason);
} }
public interface Supplier
{
CloseStatus getCloseStatus();
}
class CloseFrame extends Frame implements CloseStatus.Supplier
{
public CloseFrame(CloseStatus closeStatus, byte opcode)
{
super(opcode);
}
public CloseFrame(CloseStatus closeStatus, byte opCode, boolean fin, ByteBuffer payload)
{
super(opCode, fin, payload);
}
@Override
public CloseStatus getCloseStatus()
{
return CloseStatus.this;
}
}
} }

View File

@ -270,7 +270,7 @@ public interface FrameHandler extends IncomingFrames
/** /**
* @return True if the websocket is open outbound * @return True if the websocket is open outbound
*/ */
boolean isOpen(); boolean isOutputOpen();
/** /**
* If using BatchMode.ON or BatchMode.AUTO, trigger a flush of enqueued / batched frames. * If using BatchMode.ON or BatchMode.AUTO, trigger a flush of enqueued / batched frames.
@ -374,7 +374,7 @@ public interface FrameHandler extends IncomingFrames
} }
@Override @Override
public boolean isOpen() public boolean isOutputOpen()
{ {
return false; return false;
} }

View File

@ -19,18 +19,15 @@
package org.eclipse.jetty.websocket.core.internal; package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Queue;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.DecoratedObjectFactory; import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
@ -51,8 +48,6 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
{ {
private static final Logger LOG = Log.getLogger(ExtensionStack.class); private static final Logger LOG = Log.getLogger(ExtensionStack.class);
private final Queue<FrameEntry> entries = new ArrayDeque<>();
private final IteratingCallback flusher = new Flusher();
private final WebSocketExtensionRegistry factory; private final WebSocketExtensionRegistry factory;
private List<Extension> extensions; private List<Extension> extensions;
private IncomingFrames incoming; private IncomingFrames incoming;
@ -198,14 +193,12 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
{ {
if (outgoing == null) if (outgoing == null)
throw new IllegalStateException(); throw new IllegalStateException();
FrameEntry entry = new FrameEntry(frame, callback, batch);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queuing {}", entry); LOG.debug("Extending out {} {} {}", frame, callback, batch);
offerEntry(entry); outgoing.sendFrame(frame, callback, batch);
flusher.iterate();
} }
public void connect(IncomingFrames incoming, OutgoingFrames outgoing, WebSocketChannel webSocketChannel) public void initialize(IncomingFrames incoming, OutgoingFrames outgoing, WebSocketChannel webSocketChannel)
{ {
if (extensions == null) if (extensions == null)
throw new IllegalStateException(); throw new IllegalStateException();
@ -224,30 +217,6 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
extension.setWebSocketChannel(webSocketChannel); extension.setWebSocketChannel(webSocketChannel);
} }
private void offerEntry(FrameEntry entry)
{
synchronized (this)
{
entries.offer(entry);
}
}
private FrameEntry pollEntry()
{
synchronized (this)
{
return entries.poll();
}
}
private int getQueueSize()
{
synchronized (this)
{
return entries.size();
}
}
@Override @Override
public String dump() public String dump()
{ {
@ -263,16 +232,14 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
@Override @Override
public String dumpSelf() public String dumpSelf()
{ {
return String.format("%s@%x[size=%d,queueSize=%d]", getClass().getSimpleName(), hashCode(), extensions.size(), getQueueSize()); return String.format("%s@%x[size=%d]", getClass().getSimpleName(), hashCode(), extensions.size());
} }
@Override @Override
public String toString() public String toString()
{ {
StringBuilder s = new StringBuilder(); StringBuilder s = new StringBuilder();
s.append("ExtensionStack["); s.append("ExtensionStack[extensions=");
s.append("queueSize=").append(getQueueSize());
s.append(",extensions=");
if (extensions == null) if (extensions == null)
{ {
s.append("<null>"); s.append("<null>");
@ -304,94 +271,4 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
s.append("]"); s.append("]");
return s.toString(); return s.toString();
} }
private class Flusher extends IteratingCallback implements Callback
{
private FrameEntry current;
@Override
protected Action process() throws Exception
{
current = pollEntry();
if (current == null)
{
if (LOG.isDebugEnabled())
LOG.debug("Entering IDLE");
return Action.IDLE;
}
if (LOG.isDebugEnabled())
LOG.debug("Processing {}", current);
outgoing.sendFrame(current.frame, this, current.batch);
return Action.SCHEDULED;
}
@Override
protected void onCompleteSuccess()
{
// This IteratingCallback never completes.
throw new IllegalStateException("This IteratingCallback should never complete.");
}
@Override
protected void onCompleteFailure(Throwable x)
{
// This IteratingCallback never fails.
// The callback are those provided by WriteCallback (implemented
// below) and even in case of writeFailed() we call succeeded().
throw new IllegalStateException("This IteratingCallback should never fail.");
}
@Override
public void succeeded()
{
// Notify first then call succeeded(), otherwise
// write callbacks may be invoked out of order.
notifyCallbackSuccess(current.callback);
super.succeeded();
}
@Override
public void failed(Throwable cause)
{
// Notify first, the call succeeded() to drain the queue.
// We don't want to call failed(x) because that will put
// this flusher into a final state that cannot be exited,
// and the failure of a frame may not mean that the whole
// connection is now invalid.
notifyCallbackFailure(current.callback, cause);
super.succeeded();
}
private void notifyCallbackSuccess(Callback callback)
{
try
{
if (callback != null)
callback.succeeded();
}
catch (Throwable x)
{
LOG.debug("Exception while notifying success of callback " + callback, x);
}
}
private void notifyCallbackFailure(Callback callback, Throwable failure)
{
try
{
if (callback != null)
callback.failed(failure);
}
catch (Throwable x)
{
LOG.debug("Exception while notifying failure of callback " + callback, x);
}
}
@Override
public String toString()
{
return "ExtensionStack$Flusher[" + (extensions == null?-1:extensions.size()) + "]";
}
}
} }

View File

@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.core.internal; package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -28,14 +36,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.OpCode;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
public class FrameFlusher extends IteratingCallback public class FrameFlusher extends IteratingCallback
{ {
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY); public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
@ -49,8 +49,6 @@ public class FrameFlusher extends IteratingCallback
private final Deque<Entry> queue = new ArrayDeque<>(); private final Deque<Entry> queue = new ArrayDeque<>();
private final List<Entry> entries; private final List<Entry> entries;
private final List<ByteBuffer> buffers; private final List<ByteBuffer> buffers;
private boolean closed;
private Throwable terminated;
private ByteBuffer batchBuffer = null; private ByteBuffer batchBuffer = null;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather) public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
@ -67,25 +65,26 @@ public class FrameFlusher extends IteratingCallback
public void enqueue(Frame frame, Callback callback, boolean batch) public void enqueue(Frame frame, Callback callback, boolean batch)
{ {
Entry entry = new Entry(frame, callback, batch); Entry entry = new Entry(frame, callback, batch);
byte opCode = frame.getOpCode();
Throwable closed;
synchronized (this) synchronized (this)
{ {
closed = terminated; if (opCode == OpCode.PING || opCode == OpCode.PONG)
if (closed == null) queue.offerFirst(entry);
{ else
byte opCode = frame.getOpCode(); queue.offerLast(entry);
if (opCode == OpCode.PING || opCode == OpCode.PONG)
queue.offerFirst(entry);
else
queue.offerLast(entry);
}
} }
}
if (closed == null) public void onClose()
iterate(); {
else Throwable cause = null;
notifyCallbackFailure(callback, closed); synchronized (this)
{
if (!queue.isEmpty())
cause = new IOException("Closed");
}
if (cause!=null)
onCompleteFailure(cause);
} }
@Override @Override
@ -102,12 +101,6 @@ public class FrameFlusher extends IteratingCallback
if (succeedEntries() && batchBuffer != null) if (succeedEntries() && batchBuffer != null)
BufferUtil.clear(batchBuffer); BufferUtil.clear(batchBuffer);
if (closed)
return Action.SUCCEEDED;
if (terminated != null)
throw terminated;
while (!queue.isEmpty() && entries.size() <= maxGather) while (!queue.isEmpty() && entries.size() <= maxGather)
{ {
Entry entry = queue.poll(); Entry entry = queue.poll();
@ -167,7 +160,7 @@ public class FrameFlusher extends IteratingCallback
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} processed {} entries flush=%b batch=%s: {}", LOG.debug("{} processed {} entries flush={} batch={}: {}",
this, this,
entries.size(), entries.size(),
flush, flush,
@ -220,10 +213,7 @@ public class FrameFlusher extends IteratingCallback
notifyCallbackSuccess(entry.callback); notifyCallbackSuccess(entry.callback);
entry.release(); entry.release();
if (entry.frame.getOpCode() == OpCode.CLOSE) if (entry.frame.getOpCode() == OpCode.CLOSE)
{
terminate(new ClosedChannelException(), true);
endPoint.shutdownOutput(); endPoint.shutdownOutput();
}
} }
entries.clear(); entries.clear();
return hadEntries; return hadEntries;
@ -233,13 +223,8 @@ public class FrameFlusher extends IteratingCallback
public void onCompleteFailure(Throwable failure) public void onCompleteFailure(Throwable failure)
{ {
releaseAggregate(); releaseAggregate();
Throwable closed;
synchronized (this) synchronized (this)
{ {
closed = terminated;
if (closed == null)
terminated = failure;
entries.addAll(queue); entries.addAll(queue);
queue.clear(); queue.clear();
} }
@ -261,22 +246,6 @@ public class FrameFlusher extends IteratingCallback
} }
} }
public void terminate(Throwable cause, boolean close)
{
Throwable reason;
synchronized (this)
{
closed = close;
reason = terminated;
if (reason == null)
terminated = cause;
}
if (LOG.isDebugEnabled())
LOG.debug("{} {}", reason == null?"Terminating":"Terminated", this);
if (reason == null && !close)
iterate();
}
protected void notifyCallbackSuccess(Callback callback) protected void notifyCallbackSuccess(Callback callback)
{ {
try try
@ -312,12 +281,10 @@ public class FrameFlusher extends IteratingCallback
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x[queueSize=%d,aggregate=%s,terminated=%s]", return String.format("%s[queueSize=%d,aggregate=%s]",
getClass().getSimpleName(), super.toString(),
hashCode(),
getQueueSize(), getQueueSize(),
BufferUtil.toDetailString(batchBuffer), BufferUtil.toDetailString(batchBuffer));
terminated);
} }
private class Entry extends FrameEntry private class Entry extends FrameEntry
@ -353,7 +320,7 @@ public class FrameFlusher extends IteratingCallback
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s[%s,%s,%b,%s]", getClass().getSimpleName(), frame, callback, batch, terminated); return String.format("%s{%s,%s,%b}", getClass().getSimpleName(), frame, callback, batch);
} }
} }
} }

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.websocket.core.WebSocketException;
import java.io.Closeable; import java.io.Closeable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Supplier;
/** /**
* Parsing of a frames in WebSocket land. * Parsing of a frames in WebSocket land.
@ -374,7 +375,7 @@ public class Parser
.format("Parser@%x[s=%s,c=%d,o=0x%x,m=%s,l=%d]", hashCode(), state, cursor, firstByte, mask == null?"-":TypeUtil.toHexString(mask), payloadLength); .format("Parser@%x[s=%s,c=%d,o=0x%x,m=%s,l=%d]", hashCode(), state, cursor, firstByte, mask == null?"-":TypeUtil.toHexString(mask), payloadLength);
} }
public class ParsedFrame extends Frame implements Closeable public class ParsedFrame extends Frame implements Closeable, CloseStatus.Supplier
{ {
final CloseStatus closeStatus; final CloseStatus closeStatus;
final boolean releaseable; final boolean releaseable;
@ -404,6 +405,7 @@ public class Parser
bufferPool.release(getPayload()); bufferPool.release(getPayload());
} }
@Override
public CloseStatus getCloseStatus() public CloseStatus getCloseStatus()
{ {
return closeStatus; return closeStatus;

View File

@ -18,9 +18,21 @@
package org.eclipse.jetty.websocket.core.internal; package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Utf8Appendable; import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -40,16 +52,6 @@ import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException; import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame; import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
/** /**
* The Core WebSocket Session. * The Core WebSocket Session.
*/ */
@ -59,11 +61,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
private final static CloseStatus NO_CODE = new CloseStatus(CloseStatus.NO_CODE); private final static CloseStatus NO_CODE = new CloseStatus(CloseStatus.NO_CODE);
private final Behavior behavior; private final Behavior behavior;
private final WebSocketChannelState state = new WebSocketChannelState(); private final WebSocketChannelState channelState = new WebSocketChannelState();
private final FrameHandler handler; private final FrameHandler handler;
private final Negotiated negotiated; private final Negotiated negotiated;
private final boolean demanding; private final boolean demanding;
private final FrameSequence outgoingSequence = new FrameSequence(); private final Flusher flusher = new Flusher();
private WebSocketConnection connection; private WebSocketConnection connection;
private boolean autoFragment = WebSocketConstants.DEFAULT_AUTO_FRAGMENT; private boolean autoFragment = WebSocketConstants.DEFAULT_AUTO_FRAGMENT;
@ -81,7 +83,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
this.behavior = behavior; this.behavior = behavior;
this.negotiated = negotiated; this.negotiated = negotiated;
this.demanding = handler.isDemanding(); this.demanding = handler.isDemanding();
negotiated.getExtensions().connect(new IncomingState(), new OutgoingState(), this); negotiated.getExtensions().initialize(new IncomingAdaptor(), new OutgoingAdaptor(), this);
} }
/** /**
@ -155,7 +157,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
{ {
if (!(frame instanceof ParsedFrame)) // already check in parser if (!(frame instanceof ParsedFrame)) // already check in parser
new CloseStatus(frame.getPayload()); CloseStatus.getCloseStatus(frame); // return ignored as get used to validate there is a closeStatus
} }
} }
else else
@ -237,9 +239,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
} }
@Override @Override
public boolean isOpen() public boolean isOutputOpen()
{ {
return state.isOutOpen(); return channelState.isOutputOpen();
}
public boolean isClosed()
{
return channelState.isClosed();
} }
public void setWebSocketConnection(WebSocketConnection connection) public void setWebSocketConnection(WebSocketConnection connection)
@ -273,44 +280,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
private void close(CloseStatus closeStatus, Callback callback, boolean batch) private void close(CloseStatus closeStatus, Callback callback, boolean batch)
{ {
if (state.onCloseOut(closeStatus)) sendFrame(closeStatus.toFrame(), callback, batch);
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
try
{
handler.onClosed(state.getCloseStatus());
}
catch (Throwable e)
{
try
{
handler.onError(e);
}
catch (Throwable e2)
{
e.addSuppressed(e2);
LOG.warn(e);
}
}
finally
{
connection.close();
}
}
};
}
if (LOG.isDebugEnabled())
{
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
}
Frame frame = closeStatus.toFrame();
negotiated.getExtensions().sendFrame(frame, callback, batch);
} }
@Override @Override
@ -321,99 +291,94 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
public void onClosed(Throwable cause) public void onClosed(Throwable cause)
{ {
onClosed(cause, new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString())); CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString());
if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus);
} }
public void onClosed(Throwable cause, CloseStatus closeStatus) public void closeConnection(Throwable cause, CloseStatus closeStatus)
{ {
if (state.onClosed(closeStatus)) connection.cancelDemand();
{
connection.cancelDemand();
// Forward Errors to Local WebSocket EndPoint // Forward Errors to Local WebSocket EndPoint
if (cause!=null)
{
try try
{ {
handler.onError(cause); handler.onError(cause);
} }
catch (Throwable e) catch (Throwable e)
{ {
cause.addSuppressed(e); if (e != cause)
cause.addSuppressed(e);
LOG.warn(cause); LOG.warn(cause);
} }
try
{
handler.onClosed(closeStatus);
}
catch (Exception e)
{
LOG.warn(e);
}
}
}
/**
* Process an Error event seen by the Session and/or Connection
*
* @param cause the cause
*/
public void processError(Throwable cause)
{
CloseStatus closeStatus;
if (cause instanceof Utf8Appendable.NotUtf8Exception)
{
closeStatus = new CloseStatus(CloseStatus.BAD_PAYLOAD, cause.getMessage());
}
else if (cause instanceof SocketTimeoutException)
{
// A path often seen in Windows
closeStatus = new CloseStatus(CloseStatus.SHUTDOWN, cause.getMessage());
}
else if (cause instanceof IOException)
{
closeStatus = new CloseStatus(CloseStatus.PROTOCOL, cause.getMessage());
}
else if (cause instanceof SocketException)
{
// A path unique to Unix
closeStatus = new CloseStatus(CloseStatus.SHUTDOWN, cause.getMessage());
}
else if (cause instanceof CloseException)
{
CloseException ce = (CloseException)cause;
closeStatus = new CloseStatus(ce.getStatusCode(), ce.getMessage());
}
else if (cause instanceof WebSocketTimeoutException)
{
closeStatus = new CloseStatus(CloseStatus.SHUTDOWN, cause.getMessage());
}
else
{
LOG.warn("Unhandled Error (closing connection)", cause);
// Exception on end-user WS-Endpoint.
// Fast-fail & close connection with reason.
int statusCode = CloseStatus.SERVER_ERROR;
if (behavior == Behavior.CLIENT)
statusCode = CloseStatus.POLICY_VIOLATION;
closeStatus = new CloseStatus(statusCode, cause.getMessage());
} }
try try
{ {
// TODO can we avoid the illegal state exception in outClosed handler.onClosed(closeStatus);
close(closeStatus, Callback.NOOP, false);
} }
catch (IllegalStateException e) catch (Throwable e)
{ {
if (cause == null) LOG.warn(e);
cause = e;
else
cause.addSuppressed(e);
} }
onClosed(cause, closeStatus);
if (connection.getEndPoint().isOpen())
connection.close();
}
AbnormalCloseStatus abnormalCloseStatusFor(Throwable cause)
{
int code;
if (cause instanceof ProtocolException)
code = CloseStatus.PROTOCOL;
else if (cause instanceof CloseException)
code = ((CloseException)cause).getStatusCode();
else if (cause instanceof Utf8Appendable.NotUtf8Exception)
code = CloseStatus.BAD_PAYLOAD;
else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException)
code = CloseStatus.SHUTDOWN;
else if (behavior == Behavior.CLIENT)
code = CloseStatus.POLICY_VIOLATION;
else
code = CloseStatus.SERVER_ERROR;
return new AbnormalCloseStatus(code, cause.getMessage());
}
/**
* Process an Error that originated from the connection.
* For protocol causes, send and abnormal close frame
* otherwise just close the connection.
*
* @param cause the cause
*/
public void processConnectionError(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("processConnectionError {} {}", this, cause);
CloseStatus closeStatus = abnormalCloseStatusFor(cause);
if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, Callback.NOOP, false);
else if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus);
}
/**
* Process an Error that originated from the handler.
* Send an abnormal close frame to ensure connection is closed.
*
* @param cause the cause
*/
public void processHandlerError(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("processHandlerError {} {}", this, cause);
close(abnormalCloseStatusFor(cause), Callback.NOOP, false);
} }
/** /**
@ -427,32 +392,32 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
try try
{ {
// Upgrade success // Upgrade success
state.onConnected(); channelState.onConnected();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to CONNECTED"); LOG.debug("ConnectionState: Transition to CONNECTED");
try // Open connection and handler
{ channelState.onOpen();
// Open connection and handler handler.onOpen(this);
state.onOpen(); if (!demanding)
handler.onOpen(this); connection.demand(1);
if (!demanding)
connection.demand(1);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to OPEN"); LOG.debug("ConnectionState: Transition to OPEN");
}
catch (Throwable t)
{
LOG.warn("Error during OPEN", t);
// TODO: this must trigger onError AND onClose
processError(new CloseException(CloseStatus.SERVER_ERROR, t));
}
} }
catch (Throwable t) catch (Throwable t)
{ {
processError(t); // Handle error LOG.warn("Error during OPEN", t);
try
{
handler.onError(t);
}
catch (Exception e)
{
t.addSuppressed(e);
}
processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, t));
} }
} }
@ -499,7 +464,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
try try
{ {
assertValidOutgoing(frame); assertValidOutgoing(frame);
outgoingSequence.check(frame.getOpCode(), frame.isFin());
} }
catch (Throwable ex) catch (Throwable ex)
{ {
@ -507,20 +471,60 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
return; return;
} }
if (frame.getOpCode() == OpCode.CLOSE) try
{ {
close(new CloseStatus(frame.getPayload()), callback, batch); synchronized(flusher)
{
boolean closeConnection = channelState.onOutgoingFrame(frame);
if (frame.getOpCode() == OpCode.CLOSE)
{
if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch);
if (closeConnection)
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
closeConnection(null, channelState.getCloseStatus());
}
};
}
}
flusher.queue.offer(new FrameEntry(frame, callback, batch));
}
flusher.iterate();
} }
else catch (Throwable ex)
{ {
negotiated.getExtensions().sendFrame(frame, callback, batch); try
{
callback.failed(ex);
}
finally
{
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus)
closeConnection(null, closeStatus);
}
}
} }
} }
@Override @Override
public void flush(Callback callback) public void flush(Callback callback)
{ {
negotiated.getExtensions().sendFrame(FrameFlusher.FLUSH_FRAME, callback, false); synchronized(flusher)
{
flusher.queue.offer(new FrameEntry(FrameFlusher.FLUSH_FRAME, callback, false));
}
flusher.iterate();
} }
@Override @Override
@ -602,7 +606,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
maxTextMessageSize = maxSize; maxTextMessageSize = maxSize;
} }
private class IncomingState extends FrameSequence implements IncomingFrames private class IncomingAdaptor implements IncomingFrames
{ {
@Override @Override
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
@ -611,59 +615,52 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("receiveFrame({}, {}) - connectionState={}, handler={}", LOG.debug("receiveFrame({}, {}) - connectionState={}, handler={}",
frame, callback, state, handler); frame, callback, channelState, handler);
check(frame.getOpCode(), frame.isFin()); boolean closeConnection = channelState.onIncomingFrame(frame);
if (state.isInOpen())
// Handle inbound close
if (frame.getOpCode() == OpCode.CLOSE)
{ {
// Handle inbound close connection.cancelDemand();
if (frame.getOpCode() == OpCode.CLOSE) if (closeConnection)
{ {
connection.cancelDemand();
CloseStatus closeStatus = ((ParsedFrame)frame).getCloseStatus();
if (state.onCloseIn(closeStatus))
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
handler.onClosed(state.getCloseStatus());
connection.close();
}
};
handler.onFrame(frame, callback);
return;
}
callback = new Callback.Nested(callback) callback = new Callback.Nested(callback)
{ {
@Override @Override
public void completed() public void completed()
{ {
// was a close sent by the handler? handler.onClosed(channelState.getCloseStatus());
if (state.isOutOpen()) connection.close();
{
// No!
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: sending close response {}", closeStatus);
close(closeStatus.getCode(), closeStatus.getReason(), Callback.NOOP);
return;
}
} }
}; };
handler.onFrame(frame, callback);
return;
} }
// Handle the frame callback = new Callback.Nested(callback)
handler.onFrame(frame, callback); {
} @Override
else public void completed()
{ {
if (LOG.isDebugEnabled()) if (channelState.isOutputOpen())
LOG.debug("Discarding post EOF frame - {}", frame); {
callback.failed(new EofException()); CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: sending close response {}", closeStatus);
// this may race with a rare application close but errors are ignored
if (closeStatus==null)
closeStatus = CloseStatus.NO_CODE_STATUS;
close(closeStatus.getCode(), closeStatus.getReason(), Callback.NOOP);
}
}
};
} }
// Handle the frame
handler.onFrame(frame, callback);
} }
catch (Throwable t) catch (Throwable t)
{ {
@ -672,14 +669,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
} }
} }
private class OutgoingState implements OutgoingFrames private class OutgoingAdaptor implements OutgoingFrames
{ {
@Override @Override
public void sendFrame(Frame frame, Callback callback, boolean batch) public void sendFrame(Frame frame, Callback callback, boolean batch)
{ {
try try
{ {
connection.sendFrame(frame, callback, batch); connection.enqueueFrame(frame, callback, batch);
} }
catch (ProtocolException e) catch (ProtocolException e)
{ {
@ -742,9 +739,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override @Override
public String toString() public String toString()
{ {
return String.format("WSChannel@%x{%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s", return String.format("WSChannel@%x{%s,%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s",
hashCode(), hashCode(),
state, behavior,
channelState,
negotiated, negotiated,
autoFragment, autoFragment,
inputBufferSize, inputBufferSize,
@ -752,4 +750,67 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
maxFrameSize, maxFrameSize,
handler); handler);
} }
static class AbnormalCloseStatus extends CloseStatus
{
public AbnormalCloseStatus(int statusCode, String reasonPhrase)
{
super(statusCode, reasonPhrase);
}
}
private class Flusher extends IteratingCallback
{
private final Queue<FrameEntry> queue = new ArrayDeque<>();
FrameEntry entry;
@Override
protected Action process() throws Throwable
{
synchronized (this)
{
entry = queue.poll();
}
if (entry==null)
return Action.IDLE;
negotiated.getExtensions().sendFrame(entry.frame, this, entry.batch);
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
entry.callback.succeeded();
super.succeeded();
}
@Override
protected void onCompleteFailure(Throwable cause)
{
entry.callback.failed(cause);
Queue<FrameEntry> entries;
synchronized (this)
{
entries = new ArrayDeque<>(queue);
queue.clear();
}
entries.forEach(e-> failEntry(cause, e));
}
private void failEntry(Throwable cause, FrameEntry e)
{
try
{
e.callback.failed(cause);
}
catch(Throwable x)
{
if (cause != x)
cause.addSuppressed(x);
LOG.warn(cause);
}
}
}
} }

View File

@ -19,140 +19,212 @@
package org.eclipse.jetty.websocket.core.internal; package org.eclipse.jetty.websocket.core.internal;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
/** /**
* Atomic Connection State * Atomic Connection State
*/ */
public class WebSocketChannelState public class WebSocketChannelState
{ {
private static class State enum State
{ {
final String name; CONNECTING,
final boolean inOpen; CONNECTED,
final boolean outOpen; OPEN,
final CloseStatus closeStatus; ISHUT,
OSHUT,
State(String name, boolean inOpen, boolean outOpen, CloseStatus closeStatus) CLOSED
{
this.name = name;
this.inOpen = inOpen;
this.outOpen = outOpen;
this.closeStatus = closeStatus;
}
@Override
public String toString()
{
return String.format("%s{i=%b o=%b c=%d}", name, inOpen, outOpen, closeStatus == null?-1:closeStatus.getCode());
}
} }
private static final State CONNECTING = new State("CONNECTING", false, false, null); private State _channelState = State.CONNECTING;
private static final State CONNECTED = new State("CONNECTED", true, true, null); private byte _incomingContinuation = OpCode.UNDEFINED;
private static final State OPEN = new State("OPEN", true, true, null); private byte _outgoingContinuation = OpCode.UNDEFINED;
CloseStatus _closeStatus = null;
private AtomicReference<State> state = new AtomicReference<>(CONNECTING);
public void onConnected() public void onConnected()
{ {
if (!state.compareAndSet(CONNECTING, CONNECTED)) synchronized (this)
throw new IllegalStateException(state.get().toString()); {
if (_channelState != State.CONNECTING)
throw new IllegalStateException(_channelState.toString());
_channelState = State.CONNECTED;
}
} }
public void onOpen() public void onOpen()
{ {
if (!state.compareAndSet(CONNECTED, OPEN)) synchronized (this)
throw new IllegalStateException(state.get().toString()); {
if (_channelState != State.CONNECTED)
throw new IllegalStateException(_channelState.toString());
_channelState = State.OPEN;
}
} }
@Override @Override
public String toString() public String toString()
{ {
return state.get().toString(); return String.format("%s@%x{%s,i=%s,o=%s,c=%s}",getClass().getSimpleName(),hashCode(),
_channelState,
OpCode.name(_incomingContinuation),
OpCode.name(_outgoingContinuation),
_closeStatus);
}
public State getState()
{
synchronized (this)
{
return _channelState;
}
} }
public boolean isClosed() public boolean isClosed()
{ {
State s = state.get(); return getState()==State.CLOSED;
return !s.inOpen && !s.outOpen;
} }
public boolean isInOpen() public boolean isInputOpen()
{ {
return state.get().inOpen; State state = getState();
return (state==State.OPEN || state==State.OSHUT);
} }
public boolean isOutOpen() public boolean isOutputOpen()
{ {
return state.get().outOpen; State state = getState();
return (state==State.OPEN || state==State.ISHUT);
} }
public CloseStatus getCloseStatus() public CloseStatus getCloseStatus()
{ {
return state.get().closeStatus; synchronized (this)
}
public boolean onCloseIn(CloseStatus closeStatus)
{
while (true)
{ {
State s = state.get(); return _closeStatus;
if (!s.inOpen)
throw new IllegalStateException(state.get().toString());
if (s.outOpen)
{
State closedIn = new State("ICLOSED", false, true, closeStatus);
if (state.compareAndSet(s, closedIn))
return false;
}
else
{
State closed = new State("CLOSED", false, false, closeStatus);
if (state.compareAndSet(s, closed))
return true;
}
}
}
public boolean onCloseOut(CloseStatus closeStatus)
{
while (true)
{
State s = state.get();
if (!s.outOpen)
throw new IllegalStateException(state.get().toString());
if (s.inOpen)
{
State closedOut = new State("OCLOSED", true, false, closeStatus);
if (state.compareAndSet(s, closedOut))
return false;
}
else
{
State closed = new State("CLOSED", false, false, closeStatus);
if (state.compareAndSet(s, closed))
return true;
}
} }
} }
public boolean onClosed(CloseStatus closeStatus) public boolean onClosed(CloseStatus closeStatus)
{ {
while (true) synchronized (this)
{ {
State s = state.get(); if (_channelState == State.CLOSED)
if (!s.outOpen && !s.inOpen)
return false; return false;
State newState = new State("CLOSED", false, false, closeStatus); _closeStatus = closeStatus;
if (state.compareAndSet(s, newState)) _channelState = State.CLOSED;
return true; return true;
}
}
public boolean onOutgoingFrame(Frame frame) throws ProtocolException
{
byte opcode = frame.getOpCode();
boolean fin = frame.isFin();
synchronized (this)
{
if (!isOutputOpen())
{
if (opcode == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof WebSocketChannel.AbnormalCloseStatus)
_channelState = State.CLOSED;
throw new IllegalStateException(_channelState.toString());
}
if (opcode == OpCode.CLOSE)
{
_closeStatus = CloseStatus.getCloseStatus(frame);
if (_closeStatus instanceof WebSocketChannel.AbnormalCloseStatus)
{
_channelState = State.CLOSED;
return true;
}
switch (_channelState)
{
case OPEN:
_channelState = State.OSHUT;
return false;
case ISHUT:
_channelState = State.CLOSED;
return true;
default:
throw new IllegalStateException(_channelState.toString());
}
}
else if (frame.isDataFrame())
{
_outgoingContinuation = checkDataSequence(opcode, fin, _outgoingContinuation);
}
}
return false;
}
public boolean onIncomingFrame(Frame frame) throws ProtocolException
{
byte opcode = frame.getOpCode();
boolean fin = frame.isFin();
synchronized (this)
{
if (!isInputOpen())
throw new IllegalStateException(_channelState.toString());
if (opcode == OpCode.CLOSE)
{
_closeStatus = CloseStatus.getCloseStatus(frame);
switch (_channelState)
{
case OPEN:
_channelState = State.ISHUT;
return false;
case OSHUT:
_channelState = State.CLOSED;
return true;
default:
throw new IllegalStateException(_channelState.toString());
}
}
else if (frame.isDataFrame())
{
_incomingContinuation = checkDataSequence(opcode, fin, _incomingContinuation);
}
}
return false;
}
private static byte checkDataSequence(byte opcode, boolean fin, byte lastOpCode) throws ProtocolException
{
switch (opcode)
{
case OpCode.TEXT:
case OpCode.BINARY:
if (lastOpCode != OpCode.UNDEFINED)
throw new ProtocolException("DataFrame before fin==true");
if (!fin)
return opcode;
return OpCode.UNDEFINED;
case OpCode.CONTINUATION:
if (lastOpCode == OpCode.UNDEFINED)
throw new ProtocolException("CONTINUATION after fin==true");
if (fin)
return OpCode.UNDEFINED;
return lastOpCode;
default:
return lastOpCode;
} }
} }
} }

View File

@ -18,6 +18,13 @@
package org.eclipse.jetty.websocket.core.internal; package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
@ -31,21 +38,13 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Behavior; import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.MessageTooLargeException; import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.ProtocolException; import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException; import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Executor;
/** /**
* Provides the implementation of {@link org.eclipse.jetty.io.Connection} that is suitable for WebSocket * Provides the implementation of {@link org.eclipse.jetty.io.Connection} that is suitable for WebSocket
*/ */
public class WebSocketConnection extends AbstractConnection implements Connection.UpgradeTo, Dumpable, OutgoingFrames, Runnable public class WebSocketConnection extends AbstractConnection implements Connection.UpgradeTo, Dumpable, Runnable
{ {
private final Logger LOG = Log.getLogger(this.getClass()); private final Logger LOG = Log.getLogger(this.getClass());
@ -168,23 +167,44 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onClose() of physical connection"); LOG.debug("onClose() of physical connection");
// TODO review all close paths if (!channel.isClosed())
IOException e = new IOException("Closed"); {
flusher.terminate(e, true); IOException e = new IOException("Closed");
channel.onClosed(e); channel.onClosed(e);
}
flusher.onClose();
super.onClose(); super.onClose();
} }
@Override @Override
public boolean onIdleExpired() public boolean onIdleExpired()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onIdleExpired()"); LOG.debug("onIdleExpired()");
channel.processError(new WebSocketTimeoutException("Connection Idle Timeout")); // treat as a handler error because socket is still open
channel.processHandlerError(new WebSocketTimeoutException("Connection Idle Timeout"));
return true; return true;
} }
/**
* Event for no activity on connection (read or write)
*
* @return true to signal that the endpoint must be closed, false to keep the endpoint open
*/
@Override
protected boolean onReadTimeout(Throwable timeout)
{
if (LOG.isDebugEnabled())
LOG.debug("onReadTimeout()");
// treat as a handler error because socket is still open
channel.processHandlerError(new WebSocketTimeoutException("Timeout on Read", timeout));
return false;
}
protected void onFrame(Parser.ParsedFrame frame) protected void onFrame(Parser.ParsedFrame frame)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -221,7 +241,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
referenced.release(); referenced.release();
// notify session & endpoint // notify session & endpoint
channel.processError(cause); channel.processHandlerError(cause);
} }
}); });
} }
@ -433,7 +453,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
LOG.warn(t.toString()); LOG.warn(t.toString());
BufferUtil.clear(networkBuffer.getBuffer()); BufferUtil.clear(networkBuffer.getBuffer());
releaseNetworkBuffer(); releaseNetworkBuffer();
channel.processError(t); channel.processConnectionError(t);
} }
} }
@ -478,18 +498,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
super.onOpen(); super.onOpen();
} }
/**
* Event for no activity on connection (read or write)
*
* @return true to signal that the endpoint must be closed, false to keep the endpoint open
*/
@Override
protected boolean onReadTimeout(Throwable timeout)
{
channel.processError(new WebSocketTimeoutException("Timeout on Read", timeout));
return false;
}
@Override @Override
public void setInputBufferSize(int inputBufferSize) public void setInputBufferSize(int inputBufferSize)
{ {
@ -577,8 +585,13 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
setInitialBuffer(prefilled); setInitialBuffer(prefilled);
} }
@Override /**
public void sendFrame(Frame frame, Callback callback, boolean batch) * Enqueue a Frame to be sent.
* @param frame The frame to queue
* @param callback The callback to call once the frame is sent
* @param batch True if batch mode is to be used
*/
void enqueueFrame(Frame frame, Callback callback, boolean batch)
{ {
if (channel.getBehavior() == Behavior.CLIENT) if (channel.getBehavior() == Behavior.CLIENT)
{ {
@ -588,6 +601,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
wsf.setMask(mask); wsf.setMask(mask);
} }
flusher.enqueue(frame, callback, batch); flusher.enqueue(frame, callback, batch);
flusher.iterate();
} }
private class Flusher extends FrameFlusher private class Flusher extends FrameFlusher
@ -601,7 +615,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
public void onCompleteFailure(Throwable x) public void onCompleteFailure(Throwable x)
{ {
super.onCompleteFailure(x); super.onCompleteFailure(x);
channel.processError(x); channel.processConnectionError(x);
} }
} }
} }

View File

@ -18,6 +18,11 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -39,11 +44,6 @@ import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.eclipse.jetty.util.Callback.NOOP; import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -65,7 +65,7 @@ public class WebSocketCloseTest extends WebSocketTester
enum State enum State
{ {
OPEN, ICLOSED, OCLOSED OPEN, ISHUT, OSHUT
} }
@AfterEach @AfterEach
@ -93,7 +93,7 @@ public class WebSocketCloseTest extends WebSocketTester
break; break;
} }
case ICLOSED: case ISHUT:
{ {
TestFrameHandler serverHandler = new TestFrameHandler(); TestFrameHandler serverHandler = new TestFrameHandler();
@ -109,12 +109,12 @@ public class WebSocketCloseTest extends WebSocketTester
assertNotNull(frame); assertNotNull(frame);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL)); assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
assertThat(server.handler.getCoreSession().toString(), containsString("ICLOSED")); assertThat(server.handler.getCoreSession().toString(), containsString("ISHUT"));
LOG.info("Server: ICLOSED"); LOG.info("Server: ISHUT");
break; break;
} }
case OCLOSED: case OSHUT:
{ {
TestFrameHandler serverHandler = new TestFrameHandler(); TestFrameHandler serverHandler = new TestFrameHandler();
@ -129,8 +129,8 @@ public class WebSocketCloseTest extends WebSocketTester
assertNotNull(frame); assertNotNull(frame);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL)); assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
assertThat(server.handler.getCoreSession().toString(), containsString("OCLOSED")); assertThat(server.handler.getCoreSession().toString(), containsString("OSHUT"));
LOG.info("Server: OCLOSED"); LOG.info("Server: OSHUT");
break; break;
} }
@ -140,7 +140,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void serverClose_ICLOSED() throws Exception public void serverClose_ICLOSED() throws Exception
{ {
setup(State.ICLOSED); setup(State.ISHUT);
server.handler.receivedCallback.poll().succeeded(); server.handler.receivedCallback.poll().succeeded();
Frame frame = receiveFrame(client.getInputStream()); Frame frame = receiveFrame(client.getInputStream());
@ -154,7 +154,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void serverDifferentClose_ICLOSED() throws Exception public void serverDifferentClose_ICLOSED() throws Exception
{ {
setup(State.ICLOSED); setup(State.ISHUT);
server.sendFrame(CloseStatus.toFrame(CloseStatus.SHUTDOWN)); server.sendFrame(CloseStatus.toFrame(CloseStatus.SHUTDOWN));
server.handler.receivedCallback.poll().succeeded(); server.handler.receivedCallback.poll().succeeded();
@ -171,7 +171,7 @@ public class WebSocketCloseTest extends WebSocketTester
{ {
try (StacklessLogging stackless = new StacklessLogging(WebSocketChannel.class)) try (StacklessLogging stackless = new StacklessLogging(WebSocketChannel.class))
{ {
setup(State.ICLOSED); setup(State.ISHUT);
server.handler.receivedCallback.poll().failed(new Exception("test failure")); server.handler.receivedCallback.poll().failed(new Exception("test failure"));
Frame frame = receiveFrame(client.getInputStream()); Frame frame = receiveFrame(client.getInputStream());
@ -186,7 +186,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void clientClose_OCLOSED() throws Exception public void clientClose_OCLOSED() throws Exception
{ {
setup(State.OCLOSED); setup(State.OSHUT);
server.handler.getCoreSession().demand(1); server.handler.getCoreSession().demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS)); assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
@ -201,7 +201,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void clientDifferentClose_OCLOSED() throws Exception public void clientDifferentClose_OCLOSED() throws Exception
{ {
setup(State.OCLOSED); setup(State.OSHUT);
server.handler.getCoreSession().demand(1); server.handler.getCoreSession().demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.BAD_PAYLOAD), true)); client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.BAD_PAYLOAD), true));
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS)); assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
@ -218,7 +218,7 @@ public class WebSocketCloseTest extends WebSocketTester
{ {
try (StacklessLogging stackless = new StacklessLogging(WebSocketChannel.class)) try (StacklessLogging stackless = new StacklessLogging(WebSocketChannel.class))
{ {
setup(State.OCLOSED); setup(State.OSHUT);
server.handler.getCoreSession().demand(1); server.handler.getCoreSession().demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS)); assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
@ -246,7 +246,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void clientSendsBadFrame_OCLOSED() throws Exception public void clientSendsBadFrame_OCLOSED() throws Exception
{ {
setup(State.OCLOSED); setup(State.OSHUT);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false)); client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
server.handler.getCoreSession().demand(1); server.handler.getCoreSession().demand(1);
@ -258,7 +258,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void clientSendsBadFrame_ICLOSED() throws Exception public void clientSendsBadFrame_ICLOSED() throws Exception
{ {
setup(State.ICLOSED); setup(State.ISHUT);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false)); client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
@ -286,7 +286,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void clientAborts_OCLOSED() throws Exception public void clientAborts_OCLOSED() throws Exception
{ {
setup(State.OCLOSED); setup(State.OSHUT);
client.close(); client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
@ -299,7 +299,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void clientAborts_ICLOSED() throws Exception public void clientAborts_ICLOSED() throws Exception
{ {
setup(State.ICLOSED); setup(State.ISHUT);
client.close(); client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
@ -330,7 +330,7 @@ public class WebSocketCloseTest extends WebSocketTester
@Test @Test
public void onFrameThrows_OCLOSED() throws Exception public void onFrameThrows_OCLOSED() throws Exception
{ {
setup(State.OCLOSED); setup(State.OSHUT);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.BINARY, "binary", true)); client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.BINARY, "binary", true));
@ -478,7 +478,7 @@ public class WebSocketCloseTest extends WebSocketTester
public boolean isOpen() public boolean isOpen()
{ {
return handler.getCoreSession().isOpen(); return handler.getCoreSession().isOutputOpen();
} }
} }
} }

View File

@ -18,6 +18,11 @@
package org.eclipse.jetty.websocket.core.client; package org.eclipse.jetty.websocket.core.client;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -45,11 +50,6 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -208,7 +208,7 @@ public class WebSocketClientServerTest
public boolean isOpen() public boolean isOpen()
{ {
return handler.getCoreSession().isOpen(); return handler.getCoreSession().isOutputOpen();
} }
} }
@ -272,7 +272,7 @@ public class WebSocketClientServerTest
public boolean isOpen() public boolean isOpen()
{ {
return handler.getCoreSession().isOpen(); return handler.getCoreSession().isOutputOpen();
} }
} }

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.websocket.core.extensions; package org.eclipse.jetty.websocket.core.extensions;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory; import org.eclipse.jetty.util.DecoratedObjectFactory;
@ -35,9 +38,6 @@ import org.eclipse.jetty.websocket.core.internal.IdentityExtension;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -80,7 +80,7 @@ public class ExtensionStackTest
// Setup Listeners // Setup Listeners
IncomingFrames session = new IncomingFramesCapture(); IncomingFrames session = new IncomingFramesCapture();
OutgoingFrames connection = new OutgoingFramesCapture(); OutgoingFrames connection = new OutgoingFramesCapture();
stack.connect(session, connection, null); stack.initialize(session, connection, null);
// Dump // Dump
LOG.debug("{}", stack.dump()); LOG.debug("{}", stack.dump());
@ -104,7 +104,7 @@ public class ExtensionStackTest
// Setup Listeners // Setup Listeners
IncomingFrames session = new IncomingFramesCapture(); IncomingFrames session = new IncomingFramesCapture();
OutgoingFrames connection = new OutgoingFramesCapture(); OutgoingFrames connection = new OutgoingFramesCapture();
stack.connect(session, connection, null); stack.initialize(session, connection, null);
// Dump // Dump
LOG.debug("{}", stack.dump()); LOG.debug("{}", stack.dump());

View File

@ -18,6 +18,10 @@
package org.eclipse.jetty.websocket.core.extensions; package org.eclipse.jetty.websocket.core.extensions;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -44,10 +48,6 @@ import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker; import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.eclipse.jetty.util.Callback.NOOP; import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -218,7 +218,7 @@ public class ValidationExtensionTest extends WebSocketTester
public boolean isOpen() public boolean isOpen()
{ {
return handler.getCoreSession().isOpen(); return handler.getCoreSession().isOutputOpen();
} }
} }
} }

View File

@ -18,6 +18,12 @@
package org.eclipse.jetty.websocket.core.server; package org.eclipse.jetty.websocket.core.server;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
@ -46,12 +52,6 @@ import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
@ -547,7 +547,7 @@ public class WebSocketServerTest extends WebSocketTester
public boolean isOpen() public boolean isOpen()
{ {
return handler.getCoreSession().isOpen(); return handler.getCoreSession().isOutputOpen();
} }
} }
} }

View File

@ -18,12 +18,8 @@
package org.eclipse.jetty.websocket.servlet; package org.eclipse.jetty.websocket.servlet;
import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer; import java.util.function.Consumer;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
@ -52,6 +48,8 @@ import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.Negotiation; import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
/** /**
* Mapping of pathSpec to a tupple of {@link WebSocketCreator}, {@link FrameHandlerFactory} and * Mapping of pathSpec to a tupple of {@link WebSocketCreator}, {@link FrameHandlerFactory} and
* {@link org.eclipse.jetty.websocket.core.FrameHandler.Customizer}. * {@link org.eclipse.jetty.websocket.core.FrameHandler.Customizer}.
@ -84,7 +82,6 @@ public class WebSocketMapping implements Dumpable, LifeCycle.Listener
} }
private final PathMappings<Negotiator> mappings = new PathMappings<>(); private final PathMappings<Negotiator> mappings = new PathMappings<>();
private final Set<FrameHandlerFactory> frameHandlerFactories = new HashSet<>();
private final Handshaker handshaker = Handshaker.newInstance(); private final Handshaker handshaker = Handshaker.newInstance();
private DecoratedObjectFactory objectFactory; private DecoratedObjectFactory objectFactory;
@ -185,12 +182,6 @@ public class WebSocketMapping implements Dumpable, LifeCycle.Listener
return this.objectFactory; return this.objectFactory;
} }
public void addFrameHandlerFactory(FrameHandlerFactory webSocketServletFrameHandlerFactory)
{
// TODO should this be done by a ServiceLoader?
this.frameHandlerFactories.add(webSocketServletFrameHandlerFactory);
}
/** /**
* Get the matching {@link MappedResource} for the provided target. * Get the matching {@link MappedResource} for the provided target.
* *