Fixes #10353 - Questions about porting WebSocket APIs to jetty-core 12 (#10354)

* Added direct WebSocket upgrade in the Jetty core WebSocket APIs.
* Updated the WebSocket documentation.
* Optimized WebSocketMappings.getMatchedNegotiator() to avoid allocating a lambda for every invocation.
* Cleaned up core.server.WebSocketUpgradeHandler.
* Expanded websocket docs to mention how the demand mechanism works.
* Fixed code examples with correct demand handling.
* Javadocs for api.Callback.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-08-28 17:46:12 +02:00 committed by GitHub
parent 5946503ee0
commit 5a8c5bc8c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1050 additions and 557 deletions

View File

@ -345,8 +345,8 @@
<artifactId>jetty-websocket-jetty-client</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.ee10.websocket</groupId>
<artifactId>jetty-ee10-websocket-jetty-server</artifactId>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>jetty-websocket-jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.ee10.websocket</groupId>

View File

@ -14,12 +14,9 @@
[[pg-server-websocket-configure-filter]]
==== Advanced `WebSocketUpgradeFilter` Configuration
The `WebSocketUpgradeFilter` that handles the HTTP requests that upgrade to WebSocket is installed in these cases:
The `WebSocketUpgradeFilter` that handles the HTTP requests that upgrade to WebSocket is installed by the `JakartaWebSocketServletContainerInitializer`, as described in xref:pg-server-websocket-standard[this section].
* Either by the `JakartaWebSocketServletContainerInitializer`, as described in xref:pg-server-websocket-standard[this section].
* Or by a call to `JettyWebSocketServerContainer.addMapping(\...)`, as described in xref:pg-server-websocket-jetty[this section].
Typically, the `WebSocketUpgradeFilter` is not present in the `web.xml` configuration, and therefore the mechanisms above create a new `WebSocketUpgradeFilter` and install it _before_ any other Filter declared in `web.xml`, under the default name of `"org.eclipse.jetty.websocket.servlet.WebSocketUpgradeFilter"` and with path mapping `/*`.
Typically, the `WebSocketUpgradeFilter` is not present in the `web.xml` configuration, and therefore the mechanisms above create a new `WebSocketUpgradeFilter` and install it _before_ any other Filter declared in `web.xml`, under the default name of `"org.eclipse.jetty.{ee-current}.websocket.servlet.WebSocketUpgradeFilter"` and with path mapping `/*`.
However, if the `WebSocketUpgradeFilter` is already present in `web.xml` under the default name, then the ``ServletContainerInitializer``s will use that declared in `web.xml` instead of creating a new one.
@ -32,7 +29,7 @@ This allows you to customize:
For example:
[source,xml,subs=verbatim]
[source,xml,subs="verbatim,attributes"]
----
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
@ -44,7 +41,7 @@ For example:
<!-- The CrossOriginFilter *must* be the first --> <!--1-->
<filter>
<filter-name>cross-origin</filter-name>
<filter-class>org.eclipse.jetty.ee9.servlets.CrossOriginFilter</filter-class>
<filter-class>org.eclipse.jetty.{ee-current}.servlets.CrossOriginFilter</filter-class>
<async-supported>true</async-supported>
</filter>
<filter-mapping>
@ -55,8 +52,8 @@ For example:
<!-- Configure the default WebSocketUpgradeFilter --> <!--2-->
<filter>
<!-- The filter name must be the default WebSocketUpgradeFilter name -->
<filter-name>org.eclipse.jetty.websocket.servlet.WebSocketUpgradeFilter</filter-name> <!--3-->
<filter-class>org.eclipse.jetty.websocket.servlet.WebSocketUpgradeFilter</filter-class>
<filter-name>org.eclipse.jetty.{ee-current}.websocket.servlet.WebSocketUpgradeFilter</filter-name> <!--3-->
<filter-class>org.eclipse.jetty.{ee-current}.websocket.servlet.WebSocketUpgradeFilter</filter-class>
<!-- Configure at most 1 MiB text messages -->
<init-param> <!--4-->
<param-name>maxTextMessageSize</param-name>
@ -65,7 +62,7 @@ For example:
<async-supported>true</async-supported>
</filter>
<filter-mapping>
<filter-name>org.eclipse.jetty.websocket.servlet.WebSocketUpgradeFilter</filter-name>
<filter-name>org.eclipse.jetty.{ee-current}.websocket.servlet.WebSocketUpgradeFilter</filter-name>
<!-- Use a more specific path mapping for WebSocket requests -->
<url-pattern>/ws/*</url-pattern> <!--5-->
</filter-mapping>

View File

@ -14,7 +14,7 @@
[[pg-server-websocket-jetty]]
==== Jetty APIs Implementation
When you write a WebSocket application using the Jetty WebSocket APIs, your code typically need to depend on just the Jetty WebSocket APIs to compile your application.
When you write a WebSocket application using the Jetty WebSocket APIs, your code typically needs to depend on just the Jetty WebSocket APIs to compile your application.
However, at runtime you need to have the _implementation_ of the Jetty WebSocket APIs in your class-path (or module-path).
Jetty's WebSocket APIs are provided by the following Maven artifact:
@ -41,150 +41,85 @@ Jetty's implementation of the Jetty WebSocket APIs is provided by the following
[NOTE]
====
The `jetty-websocket-jetty-api` artifact and the `jetty-websocket-jetty-server` artifact (and its transitive dependencies) should be present in the server class-path (or module-path), and never in the web application's `/WEB-INF/lib` directory.
The `jetty-websocket-jetty-api` artifact and the `jetty-websocket-jetty-server` artifact (and its transitive dependencies) should be present in the server class-path (or module-path), and never in a web application's `/WEB-INF/lib` directory.
====
To configure correctly your WebSocket application based on the Jetty WebSocket APIs, you need two steps:
. Make sure that Jetty xref:pg-server-websocket-jetty-container[sets up] an instance of `JettyWebSocketServerContainer`.
. Use the `JettyWebSocketServerContainer` APIs in your applications to xref:pg-server-websocket-jetty-endpoints[register your WebSocket endpoints] that implement your application logic.
. Make sure to xref:pg-server-websocket-jetty-container[set up] an instance of `org.eclipse.jetty.websocket.server.ServerWebSocketContainer`.
. Use the `ServerWebSocketContainer` APIs in your applications to xref:pg-server-websocket-jetty-container-websocket-handler[register the WebSocket endpoints] that implement your application logic.
You can read more about the xref:pg-websocket-architecture[Jetty WebSocket architecture], which is common to both client-side and server-side, to get familiar with the terminology used in the following sections.
[[pg-server-websocket-jetty-container]]
===== Setting up `JettyWebSocketServerContainer`
===== Setting up `ServerWebSocketContainer`
Jetty sets up a `JettyWebSocketServerContainer` instance using `JettyWebSocketServletContainerInitializer`.
You need Jetty to set up a `ServerWebSocketContainer` instance to make your WebSocket applications based on the Jetty WebSocket APIs work.
When you deploy web applications using xref:pg-server-http-handler-use-webapp-context[`WebAppContext`], then `JettyWebSocketServletContainerInitializer` is automatically discovered and initialized by Jetty when the web application starts, so that it sets up the `JettyWebSocketServerContainer`.
In this way, you do not need to write any additional code:
Your WebSocket web application is represented by a `ContextHandler`.
The WebSocket upgrade is performed in a descendant (typically the only child) of the `ContextHandler`, either by the `org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler`, or by a custom `Handler` that you write and is part of your web application.
In both cases, you need to set up a `ServerWebSocketContainer`, and this can be done xref:pg-server-websocket-jetty-container-websocket-handler[implicitly] by using `WebSocketUpgradeHandler`, or xref:pg-server-websocket-jetty-container-websocket-container[explicitly] by creating the `ServerWebSocketContainer` instance.
[[pg-server-websocket-jetty-container-websocket-handler]]
====== Implicit setup using `WebSocketUpgradeHandler`
Using `WebSocketUpgradeHandler` is the most common way to set up your WebSocket applications.
You can use the `WebSocketUpgradeHandler` and the `ServerWebSocketContainer` APIs to map HTTP request URIs to WebSocket endpoints.
When an HTTP request arrives, `WebSocketUpgradeHandler` tests whether it is a WebSocket upgrade request, whether it matches a mapped URI, and if so upgrades the protocol to WebSocket.
From this point on, the communication on the upgraded connection happens with the WebSocket protocol.
This is very similar to what xref:pg-server-websocket-standard-upgrade[`WebSocketUpgradeFilter`] does when using the Jakarta EE WebSocket APIs.
Once you have set up the `WebSocketUpgradeHandler`, you can use the `ServerWebSocketContainer` APIs to configure the WebSocket endpoints.
The example below shows how to set up the `WebSocketUpgradeHandler` and use the `ServerWebSocketContainer` APIs:
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=standardContainerWebAppContext]
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyContainerWithUpgradeHandler]
----
On the other hand, when you deploy web applications using xref:pg-server-http-handler-use-servlet-context[`ServletContextHandler`], you have to write the code to ensure that the `JettyWebSocketServletContainerInitializer` is initialized, so that it sets up the `JettyWebSocketServerContainer`:
The mapping of request URIs to WebSocket endpoints is further explained in xref:pg-server-websocket-jetty-pathspec[this section].
[[pg-server-websocket-jetty-container-websocket-container]]
====== Explicit setup using `ServerWebSocketContainer`
A more advanced way to set up your WebSocket applications is to explicitly create the `ServerWebSocketContainer` instance programmatically.
This gives you more flexibility when deciding whether an HTTP request should be upgraded to WebSocket, because you do not need to match request URIs (although you can), nor you need to use `WebSocketUpgradeHandler` (although you can).
Once you have created the `ServerWebSocketContainer`, you can use its APIs to configure the WebSocket endpoints as shown in the example below.
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyContainerServletContextHandler]
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyContainerWithContainer]
----
Calling `JettyWebSocketServletContainerInitializer.configure(\...)` must be done _before_ the `ServletContextHandler` is started, and configures the Jetty WebSocket implementation for that web application context.
Note how the call to `ServerWebSocketContainer.upgrade(\...)` allows you to perform a direct WebSocket upgrade programmatically.
[[pg-server-websocket-jetty-endpoints]]
===== Configuring Endpoints
===== WebSocket Endpoints
Once you have xref:pg-server-websocket-jetty-container[setup] the `JettyWebSocketServerContainer`, you can configure your xref:pg-websocket-endpoints[WebSocket endpoints].
When using the Jetty WebSocket APIs, the WebSocket endpoint classes must be either annotated with the Jetty WebSocket annotations from the `org.eclipse.jetty.websocket.api.annotations` package, or implement the `org.eclipse.jetty.websocket.api.Session.Listener` interface.
Differently from the xref:pg-server-websocket-standard-endpoints[configuration of standard WebSocket endpoints], WebSocket endpoint classes may be annotated with Jetty WebSocket API annotations, or extend the `org.eclipse.jetty.ee9.websocket.api.WebSocketListener` interface, but they are not automatically discovered, not even when deploying web applications using xref:pg-server-http-handler-use-webapp-context[`WebAppContext`].
In the case you want to implement the `Session.Listener` interface, remember that you have to explicitly demand to receive the next WebSocket event.
Use `Session.Listener.AutoDemanding` to automate the demand for simple use cases.
[IMPORTANT]
====
When using the Jetty WebSocket APIs, WebSocket endpoints must always be explicitly configured.
====
Refer to the Jetty WebSocket architecture xref:pg-websocket-endpoints[section] for more information about Jetty WebSocket endpoints and how to correctly deal with the demand for WebSocket events.
There are two ways of configuring WebSocket endpoints when using the Jetty WebSocket APIs:
There is no automatic discovery of WebSocket endpoints; all the WebSocket endpoints of your application must be returned by a `org.eclipse.jetty.websocket.server.WebSocketCreator` that is either mapped to a request URI via `ServerWebSocketContainer.addMapping(\...)`, or directly upgraded via `ServerWebSocketContainer.upgrade(\...)`.
* xref:pg-server-websocket-jetty-endpoints-container[Using `JettyWebSocketServerContainer`], which is very similar to how WebSocket endpoints are configured when using the xref:pg-server-websocket-standard-endpoints[standard `jakarta.websocket` APIs], but also provides APIs to perform a direct, programmatic, WebSocket upgrade.
* xref:pg-server-websocket-jetty-endpoints-servlet[Using `JettyWebSocketServlet`], which may configured in `web.xml`, rather than in Java code.
In the call to `ServerWebSocketContainer.addMapping(\...)`, you can specify a _path spec_ (the first parameter) that can specified as discussed in xref:pg-server-websocket-jetty-pathspec[this section].
[[pg-server-websocket-jetty-endpoints-container]]
====== Using `JettyWebSocketServerContainer`
To register WebSocket endpoints using the Jetty WebSocket APIs you need to access the `JettyWebSocketServerContainer` APIs.
The `JettyWebSocketServerContainer` instance is stored in the `ServletContext`, so it can be retrieved when the `ServletContext` is initialized, either from a `ServletContextListener` or from a `HttpServlet`:
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyEndpointsInitialization]
----
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyWebSocketInitializerServlet]
----
You can also use this variant to set up the `JettyWebSocketServerContainer` and configure the WebSocket endpoints in one step:
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyContainerAndEndpoints]
----
In the call to `JettyWebSocketServerContainer.addMapping(\...)`, you can specify a _path spec_ (the first parameter) that can be configured as specified in xref:pg-server-websocket-jetty-pathspec[this section].
When the `ServletContextHandler` is started, the `Configurator` lambda (the second parameter passed to `JettyWebSocketServletContainerInitializer.configure(\...)`) is invoked and allows you to explicitly configure the WebSocket endpoints using the Jetty WebSocket APIs provided by `JettyWebSocketServerContainer`.
Under the hood, the call to `JettyWebSocketServerContainer.addMapping(\...)` installs the `org.eclipse.jetty.websocket.servlet.WebSocketUpgradeFilter`, which is the component that intercepts HTTP requests to upgrade to WebSocket, described in xref:pg-server-websocket-standard-upgrade[this section].
For more information about the configuration of `WebSocketUpgradeFilter` see also xref:pg-server-websocket-configure-filter[this section].
One last alternative to register your WebSocket endpoints is to use a programmatic WebSocket upgrade via `JettyWebSocketServerContainer.upgrade(\...)`, which allows you to use a standard `HttpServlet` subclass (rather than a `JettyWebSocketServlet` as explained in xref:pg-server-websocket-jetty-endpoints-servlet[this section]) to perform a direct WebSocket upgrade when your application logic demands so:
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyContainerServletContextHandler]
----
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyContainerUpgrade]
----
When using `JettyWebSocketServerContainer.upgrade(\...)`, the `WebSocketUpgradeFilter` is not installed, since the WebSocket upgrade is performed programmatically.
[[pg-server-websocket-jetty-endpoints-servlet]]
====== Using `JettyWebSocketServlet`
An alternative way to register WebSocket endpoints using the Jetty WebSocket APIs is to use a `JettyWebSocketServlet` subclass (or even many different `JettyWebSocketServlet` subclasses).
This method has the advantage that it does not install the `WebSocketUpgradeFilter` under the hood, because the WebSocket upgrade is handled directly by your `JettyWebSocketServlet` subclass.
This may also have a performance benefit for non-WebSocket HTTP requests (as they will not pass through the `WebSocketUpgradeFilter`).
Your `JettyWebSocketServlet` subclass may be declared and configured either in code or in `web.xml`.
Declaring your `JettyWebSocketServlet` subclass explicitly in code or in `web.xml` also simplifies the declaration and configuration of other web components such as other Servlets and/or Filters (for example, it is easier to configure the `CrossOriginFilter`, see also xref:pg-server-websocket-configure-filter[this section] for more information).
For example, your `JettyWebSocketServlet` subclass may be declared in code in this way:
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyWebSocketServletMain]
----
[source,java,indent=0]
----
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=jettyWebSocketServlet]
----
Note how in the call to `JettyWebSocketServletContainerInitializer.configure(\...)` the second parameter is `null`, because WebSocket endpoints are not created here, but instead by one (or more) `JettyWebSocketServlet` subclasses.
Yet the call is necessary to create other WebSocket implementation components that are necessary also when using `JettyWebSocketServlet` subclasses.
An HTTP upgrade request to WebSocket that matches your `JettyWebSocketServlet` subclass path mapping (specified above via `ServletContextHandler.addServlet(\...)`) arrives at the Servlet and is inspected to verify whether it is a valid upgrade to WebSocket.
If the HTTP request is a valid upgrade to WebSocket, `JettyWebSocketServlet` calls `configure(JettyWebSocketServletFactory factory)` that you have overridden in your subclass, so that your application can instantiate and return the WebSocket endpoint.
After having obtained the WebSocket endpoint, `JettyWebSocketServlet` performs the WebSocket upgrade.
From this point on, the communication happens with the WebSocket protocol, and HTTP components such as Filters and Servlets are not relevant anymore.
If the HTTP request is not an upgrade to WebSocket, `JettyWebSocketServlet` delegates the processing to the superclass, `jakarta.servlet.HttpServlet`, which in turn invokes methods such as `doGet(\...)` or `doPost(\...)` depending on the HTTP method.
If your `JettyWebSocketServlet` subclass did not override the `doXYZ(\...)` method corresponding to the HTTP request, a `405 Method Not Allowed` response is returned to the client, as per the standard `HttpServlet` class implementation.
[NOTE]
====
It is possible to use both `JettyWebSocketServerContainer` and `JettyWebSocketServlet`.
However, it is typically best to avoid mixing the use of `JettyWebSocketServerContainer` with the use of `JettyWebSocketServlet`, so that all your WebSocket endpoints are initialized by the same code in one place only.
====
Using `JettyWebSocketServerContainer.addMapping(\...)` will install the `WebSocketUpgradeFilter` under the hood, which by default will intercepts all HTTP requests to upgrade to WebSocket.
However, as explained in xref:pg-server-websocket-standard-upgrade[this section], if `WebSocketUpgradeFilter` does not find a matching WebSocket endpoint for the request URI path, then the HTTP request is passed to the Filter chain of your web application and may arrive to your `JettyWebSocketServlet` subclass, where it would be processed and possibly result in a WebSocket upgrade.
When the `Server` is started, the lambda passed to `ServerWebSocketContainer.configure(\...)`) is invoked and allows you to explicitly configure the WebSocket endpoints using the Jetty WebSocket APIs provided by `ServerWebSocketContainer`.
[[pg-server-websocket-jetty-pathspec]]
====== Custom PathSpec Mappings
The `JettyWebSocketServerContainer.addMapping(\...)` API maps a _path spec_ to a `JettyWebSocketCreator` instance (typically a lambda expression).
The path spec is matched against the WebSocket upgrade request URI to select the correspondent `JettyWebSocketCreator` to invoke.
The `ServerWebSocketContainer.addMapping(\...)` API maps a _path spec_ to a `WebSocketCreator` instance (typically a lambda expression).
The path spec is matched against the WebSocket upgrade request URI to select the correspondent `WebSocketCreator` to invoke.
The path spec can have these forms:
@ -192,7 +127,7 @@ The path spec can have these forms:
* Regex syntax, specified with `regex|<path spec>`, where the `regex|` prefix can be omitted if the path spec begins with `^` (for example, `+^/ws/[0-9]++`).
* URI template syntax, specified with `uri-template|<path spec>` (for example `+uri-template|/ws/chat/{room}+`).
Within the `JettyWebSocketCreator`, it is possible to access the path spec and, for example in case of URI templates, extract additional information in the following way:
Within the `WebSocketCreator`, it is possible to access the path spec and, for example in case of URI templates, extract additional information in the following way:
[source,java,indent=0]
----

View File

@ -17,54 +17,37 @@
When you write a WebSocket application using the standard `jakarta.websocket` APIs, your code typically need to depend on just the APIs to compile your application.
However, at runtime you need to have an implementation of the standard APIs in your class-path (or module-path).
The standard `jakarta.websocket` APIs are provided by the following Maven artifact:
The standard `jakarta.websocket` APIs, for example for Jakarta {ee-current-caps}, are provided by the following Maven artifact:
[source,xml,subs=normal]
----
<dependency>
<groupId>jakarta.websocket</groupId>
<artifactId>jakarta.websocket-api</artifactId>
<version>2.0.0</version>
<version>2.1.0</version>
</dependency>
----
However, the artifact above lacks a proper JPMS `module-info.class` file, and therefore it is a little more difficult to use if you want to use of JPMS for your application.
If you want to use JPMS for your application, you can use this Maven artifact instead:
At runtime, you also need an implementation of the standard Jakarta {ee-current-caps} WebSocket APIs, that Jetty provides with the following Maven artifact (and its transitive dependencies):
[source,xml,subs=normal]
----
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-jakarta-websocket-api</artifactId>
<version>2.0.0</version>
</dependency>
----
This artifact is nothing more than the `jakarta.websocket:jakarta.websocket-api:2.0.0` artifact repackaged with a proper `module-info.class` file.
At runtime, you also need an implementation of the standard `jakarta.websocket` APIs.
Jetty's implementation of the standard `jakarta.websocket` APIs is provided by the following Maven artifact (and its transitive dependencies):
[source,xml,subs=normal]
----
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-jakarta-server</artifactId>
<groupId>org.eclipse.jetty.{ee-current}.websocket</groupId>
<artifactId>jetty-{ee-current}-websocket-jakarta-server</artifactId>
<version>{version}</version>
</dependency>
----
[NOTE]
====
The `jakarta.websocket-api` artifact and the `websocket-jakarta-server` artifact (and its transitive dependencies) should be present in the server class-path (or module-path), and never in the web application's `/WEB-INF/lib` directory.
The `jakarta.websocket-api` artifact and the `jetty-{ee-current}-websocket-jakarta-server` artifact (and their transitive dependencies) should be present in the server class-path (or module-path), and never in the web application's `/WEB-INF/lib` directory.
====
To configure correctly your WebSocket application based on the standard `jakarta.websocket` APIs, you need two steps:
To configure correctly your WebSocket application based on the standard Jakarta {ee-current-caps} WebSocket APIs, you need two steps:
. Make sure that Jetty xref:pg-server-websocket-standard-container[sets up] an instance of `jakarta.websocket.server.ServerContainer`.
. xref:pg-server-websocket-standard-endpoints[Configure] the WebSocket endpoints that implement your application logic, either by annotating their classes with the standard `jakarta.websocket` annotations, or by using the `ServerContainer` APIs to register them in your code.
. Make sure that Jetty sets up an instance of `jakarta.websocket.server.ServerContainer`, described in xref:pg-server-websocket-standard-container[this section].
. Configure the WebSocket endpoints that implement your application logic, either by annotating their classes with the standard `jakarta.websocket` annotations, or by using the `ServerContainer` APIs to register them in your code, described in xref:pg-server-websocket-standard-endpoints[this section].
[[pg-server-websocket-standard-container]]
===== Setting Up `ServerContainer`
@ -86,7 +69,7 @@ On the other hand, when you deploy web applications using xref:pg-server-http-ha
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=standardContainerServletContextHandler]
----
Calling `JakartaWebSocketServletContainerInitializer.configure(\...)` must be done _before_ the `ServletContextHandler` is started, and configures the `jakarta.websocket` implementation for that web application context.
Calling `JakartaWebSocketServletContainerInitializer.configure(\...)` must be done _before_ the `ServletContextHandler` is started, and configures the Jakarta {ee-current-caps} WebSocket implementation for that web application context, making `ServerContainer` available to web applications.
[[pg-server-websocket-standard-endpoints]]
===== Configuring Endpoints
@ -100,7 +83,7 @@ In this way, you do not need to write any additional code; you just need to ensu
On the other hand, when you deploy web applications using xref:pg-server-http-handler-use-webapp-context[`WebAppContext`] but you need to perform more advanced configuration of the `ServerContainer` or of the WebSocket endpoints, or when you deploy web applications using xref:pg-server-http-handler-use-servlet-context[`ServletContextHandler`], you need to access the `ServerContainer` APIs.
The `ServerContainer` instance is stored as a `ServletContext` attribute, so it can be retrieved when the `ServletContext` is initialized, either from a `ServletContextListener` or from a `HttpServlet`:
The `ServerContainer` instance is stored as a `ServletContext` attribute, so it can be retrieved when the `ServletContext` is initialized, either from a `ServletContextListener`, or from a Servlet `Filter`, or from an `HttpServlet`:
[source,java,indent=0]
----
@ -112,7 +95,7 @@ include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/We
include::../../{doc_code}/org/eclipse/jetty/docs/programming/server/websocket/WebSocketServerDocs.java[tags=standardWebSocketInitializerServlet]
----
When you deploy web applications using xref:pg-server-http-handler-use-servlet-context[`ServletContextHandler`], you can also use this variant to set up the `ServerContainer` and configure the WebSocket endpoints in one step:
When you deploy web applications using xref:pg-server-http-handler-use-servlet-context[`ServletContextHandler`], you can alternatively use the code below to set up the `ServerContainer` and configure the WebSocket endpoints in one step:
[source,java,indent=0]
----
@ -124,11 +107,11 @@ When the `ServletContextHandler` is started, the `Configurator` lambda (the seco
[[pg-server-websocket-standard-upgrade]]
====== Upgrade to WebSocket
Under the hood, `JakartaWebSocketServletContainerInitializer` installs the `org.eclipse.jetty.websocket.servlet.WebSocketUpgradeFilter`, which is the component that intercepts HTTP requests to upgrade to WebSocket, and performs the upgrade from the HTTP protocol to the WebSocket protocol.
Under the hood, `JakartaWebSocketServletContainerInitializer` installs the `org.eclipse.jetty.{ee-current}.websocket.servlet.WebSocketUpgradeFilter`, which is the component that intercepts HTTP requests to upgrade to WebSocket, and performs the upgrade from the HTTP protocol to the WebSocket protocol.
[NOTE]
====
The `WebSocketUpgradeFilter` is installed under the filter name corresponding to its class name (that is, the string `"org.eclipse.jetty.websocket.servlet.WebSocketUpgradeFilter"`) and with a filter mapping of `/*`.
The `WebSocketUpgradeFilter` is installed under the filter name corresponding to its class name (that is, the string `"org.eclipse.jetty.{ee-current}.websocket.servlet.WebSocketUpgradeFilter"`) and with a filter mapping of `/*`.
Refer to the xref:pg-server-websocket-configure-filter[advanced `WebSocketUpgradeFilter` configuration section] for more information.
====

View File

@ -14,27 +14,34 @@
[[pg-server-websocket]]
=== WebSocket Server
Jetty provides two API implementations of the WebSocket protocol:
Jetty provides different implementations of the WebSocket protocol:
* An implementation for the standard `jakarta.websocket` APIs provided by link:https://jakarta.ee/specifications/websocket/2.0/[Jakarta WebSocket 2.0], described in xref:pg-server-websocket-standard[this section].
* An implementation for Jetty-specific WebSocket APIs, described in xref:pg-server-websocket-jetty[this section].
* A Jakarta EE 8 (`javax.websocket`) implementation, based on the link:https://jakarta.ee/specifications/websocket/1.1/[Jakarta WebSocket 1.1 Specification].
* A Jakarta EE 9 (`jakarta.websocket`) implementation, based on the link:https://jakarta.ee/specifications/websocket/2.0/[Jakarta WebSocket 2.0 Specification].
* A Jakarta EE 10 (`jakarta.websocket`) implementation, based on the link:https://jakarta.ee/specifications/websocket/2.1/[Jakarta WebSocket 2.1 Specification].
* A Jetty specific implementation, based on the Jetty WebSocket APIs, that does not depend on any Jakarta EE APIs.
Using the standard `jakarta.websocket` APIs allows your applications to depend only on standard APIs, and your applications may be deployed in any compliant WebSocket Container that supports Jakarta WebSocket 2.0.
The Jakarta EE implementations and APIs are described in xref:pg-server-websocket-standard[this section].
The standard APIs provide these features that are not present in the Jetty WebSocket APIs:
Using the standard Jakarta EE WebSocket APIs allows your applications to depend only on standard APIs, and your applications may be deployed in any compliant WebSocket Container that supports Jakarta WebSocket.
The standard Jakarta EE WebSocket APIs provide these features that are not present in the Jetty WebSocket APIs:
* Encoders and Decoders for automatic conversion of text or binary messages to objects.
On the other hand, the Jetty WebSocket APIs are more efficient and offer greater and more fine-grained control, and provide these features that are not present in the standard APIs:
The Jetty specific WebSocket implementation and APIs are described in xref:pg-server-websocket-jetty[this section].
* Suspend/resume to control backpressure.
Using the Jetty WebSocket APIs allows your applications to be more efficient and offer greater and more fine-grained control, and provide these features that are not present in the Jakarta EE WebSocket APIs:
* A demand mechanism to control backpressure.
* Remote socket address (IP address and port) information.
* WebSocket upgrade handling via Filter or Servlet.
* Advanced URI matching with Servlet WebSocket upgrade.
* Configuration of the network buffer capacity.
* Advanced request URI matching with regular expressions, in addition to Servlet patterns and URI template patterns.
* More configuration options, for example the network buffer capacity.
* Programmatic WebSocket upgrade, in addition to WebSocket upgrade based on URI matching, for maximum flexibility.
If your application needs specific features that are not provided by the standard APIs, the Jetty WebSocket APIs may provide such features -- and if they do not, you may ask for these features by submitting an issue to the Jetty Project without waiting for the standard process to approve them.
If your application needs specific features that are not provided by the standard APIs, the Jetty WebSocket APIs may provide such features.
TIP: If the feature you are looking for is not present, you may ask for these features by link:https://github.com/eclipse/jetty.project/issues[submitting an issue] to the Jetty Project without waiting for the standard Jakarta EE process to approve them and release a new version of the Jakarta EE WebSocket specification.
include::server-websocket-standard.adoc[]
include::server-websocket-jetty.adoc[]
include::server-websocket-filter.adoc[]
include::server-websocket-jetty.adoc[]

View File

@ -55,9 +55,72 @@ Applications interested in this type of messages receive a `String` representing
** BINARY.
Applications interested in this type of messages receive a `ByteBuffer` representing the raw bytes received.
Only one thread at a time will be delivering frame or message events to the corresponding methods; the next frame or message event will not be delivered until the previous call to the corresponding method has exited, and if there is demand for it.
xref:pg-websocket-endpoints-listener[Listener endpoints] are notified of events by invoking the correspondent method defined by the `org.eclipse.jetty.websocket.api.Session.Listener` interface.
xref:pg-websocket-endpoints-annotated[Annotated endpoints] are notified of events by invoking the correspondent method annotated with the correspondent annotation from the `+org.eclipse.jetty.websocket.api.annotations.*+` package.
For both types of WebSocket endpoints, only one thread at a time will be delivering frame or message events to the corresponding methods; the next frame or message event will not be delivered until the previous call to the corresponding method has exited, and if there is xref:pg-websocket-endpoints-demand[demand] for it.
Endpoints will always be notified of message events in the same order they were received over the network.
[[pg-websocket-endpoints-demand]]
===== WebSocket Events Demand
In order to receive WebSocket events, you must _demand_ for them; the only exception is the _open_ event, because it is the initial event that applications can interact with.
When a WebSocket event is received by an endpoint, the demand for WebSocket events (for that endpoint) is reset, so that no more WebSocket events will be received by the endpoint.
It is responsibility of the endpoint to demand to receive more WebSocket events.
For simple cases, you can just annotate your WebSocket endpoint with `@WebSocket(autoDemand = true)`, or implement `Session.Listener.AutoDemanding`.
In these two cases, when a method that receives a WebSocket event returns, the Jetty implementation automatically demands for another WebSocket event.
For example:
[source,java,indent=0]
----
include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=autoDemand]
----
While auto-demand works for simple cases, it may not work in all cases, especially those where the method that receives the WebSocket event performs asynchronous operations.
The following example shows the problem:
[source,java,indent=0]
----
include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=autoDemandWrong]
----
Note how, in the example above, auto-demanding has the problem that receiving WebSocket text messages may happen faster than echoing them back, because the call to `sendText(\...)` may return almost immediately but be slow to complete because it is asynchronous.
In the example above, if another WebSocket text message arrives, and the `sendText(\...)` operation is not complete, a `WritePendingException` will be thrown.
In other cases, this may lead to infinite buffering of data, eventually causing ``OutOfMemoryError``s, and in general excessive resource consumption that may be difficult to diagnose and troubleshoot.
For more information, see also the xref:pg-websocket-session-send[section about sending data].
[CAUTION]
====
Always be careful when using auto-demand.
Analyze the operations that your endpoint performs and make sure they complete synchronously within the method.
====
To solve the problem outlined above, you must explicitly demand for the next WebSocket event, only when the processing of the previous events is complete.
For example:
[source,java,indent=0]
----
include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=explicitDemand]
----
Note how it is necessary to invoke `Session.demand()` from the _open_ event, in order to receive _message_ events.
Furthermore, note how every time a text message is received, a possibly slow asynchronous operation is initiated (which returns almost immediately, although it may not be completed yet) and then the method returns.
Because there is no demand when the method returns (because the asynchronous operation is not completed yet), the implementation will not notify any other WebSocket event (not even _frame_, _close_ or _error_ events).
When the asynchronous operation completes successfully the callback is notified; this, in turn, invokes `Session.demand()`, and the implementation may notify another WebSocket event (if any) to the WebSocket endpoint.
[[pg-websocket-endpoints-listener]]
===== Listener Endpoints
@ -67,7 +130,6 @@ A WebSocket endpoint may implement the `org.eclipse.jetty.websocket.api.Session.
----
include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=listenerEndpoint]
----
<1> Your listener class implements `Session.Listener`.
====== Message Streaming Reads
@ -76,7 +138,7 @@ For large WebSocket messages, the memory usage may be large due to the fact that
To stream textual or binary messages, you override either `org.eclipse.jetty.websocket.api.Session.Listener.onWebSocketPartialText(\...)` or `org.eclipse.jetty.websocket.api.Session.Listener.onWebSocketPartialBinary(\...)`.
These methods that receive _chunks_ of, respectively, text and bytes that form the whole WebSocket message.
These methods receive _chunks_ of, respectively, text and bytes that form the whole WebSocket message.
You may accumulate the chunks yourself, or process each chunk as it arrives, or stream the chunks elsewhere, for example:
@ -88,39 +150,33 @@ include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=s
[[pg-websocket-endpoints-annotated]]
===== Annotated Endpoints
A WebSocket endpoint may annotate methods with `org.eclipse.jetty.websocket.api.annotations.*` annotations to receive WebSocket events.
A WebSocket endpoint may annotate methods with `+org.eclipse.jetty.websocket.api.annotations.*+` annotations to receive WebSocket events.
Each annotated method may take an optional `Session` argument as its first parameter:
[source,java,indent=0]
----
include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=annotatedEndpoint]
----
<1> Use the `@WebSocket` annotation at the class level to make it a WebSocket endpoint.
<1> Use the `@WebSocket` annotation at the class level to make it a WebSocket endpoint, and disable auto-demand.
<2> Use the `@OnWebSocketOpen` annotation for the _open_ event.
As this is the first event notified to the endpoint, you can configure the `Session` object.
<3> Use the `@OnWebSocketClose` annotation for the _close_ event.
The method may take an optional `Session` as first parameter.
<3> Use the `@OnWebSocketMessage` annotation for the _message_ event, both for textual and binary messages.
<4> Use the `@OnWebSocketError` annotation for the _error_ event.
The method may take an optional `Session` as first parameter.
<5> Use the `@OnWebSocketMessage` annotation for the _message_ event, both for textual and binary messages.
The method may take an optional `Session` as first parameter.
[NOTE]
====
For binary messages, you may declare the annotated method with this signature:
[source,java]
----
@OnWebSocketMessage
public void methodName(ByteBuffer buffer, Callback callback) { ... }
----
====
<5> Use the `@OnWebSocketClose` annotation for the _close_ event.
[[pg-websocket-endpoints-annotated-streaming]]
====== Message Streaming Reads
If you need to deal with large WebSocket messages, you may reduce the memory usage by streaming the message content.
To stream textual or binary messages, you still use the `@OnWebSocketMessage` annotation, but you change the signature of the method to take, respectively a `Reader` and an `InputStream`:
To stream textual or binary messages, you still use the `@OnWebSocketMessage` annotation, but you change the signature of the method to take an additional `boolean` parameter:
[source,java,indent=0]
----
include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=partialAnnotatedEndpoint]
----
Alternatively, but less efficiently, you can use the `@OnWebSocketMessage` annotation, but you change the signature of the method to take, respectively, a `Reader` and an `InputStream`:
[source,java,indent=0]
----
@ -132,6 +188,9 @@ include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=s
`Reader` or `InputStream` only offer blocking APIs, so if the remote peers are slow in sending the large WebSocket messages, reading threads may be blocked in `Reader.read(char[])` or `InputStream.read(byte[])`, possibly exhausting the thread pool.
====
Note that when you use blocking APIs, the invocations to `Session.demand()` are now performed by the `Reader` or `InputStream` implementations (as well as the `ByteBuffer` lifecycle management).
You indirectly control the demand by deciding when to read from `Reader` or `InputStream`.
[[pg-websocket-session]]
==== WebSocket Session
@ -186,16 +245,13 @@ include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=s
<2> Note how the second send must be performed from inside the callback.
<3> Sequential sends may throw `WritePendingException`.
// TODO: rewrite this section in light of maxOutgoingFrames.
Non-blocking APIs are more difficult to use since you are required to meet the following condition:
[IMPORTANT]
====
You cannot initiate another send of any kind until the previous send is completed.
Non-blocking APIs are more difficult to use since you are required to meet the following condition:
For example, if you have initiated a text send, you cannot initiate a binary send, until the previous send has completed.
* You cannot initiate another send of any kind until the previous send is completed.
Furthermore, if you have initiated a non-blocking send, you cannot initiate a blocking send, until the previous send has completed.
For example, if you have initiated a text send, you cannot initiate another text or binary send, until the previous send has completed.
====
This requirement is necessary to avoid unbounded buffering that could lead to ``OutOfMemoryError``s.
@ -206,7 +262,7 @@ We strongly recommend that you follow the condition above.
However, there may be cases where you want to explicitly control the number of outgoing buffered messages using `RemoteEndpoint.setMaxOutgoingFrames(int)`.
Remember that trying to control the number of outgoing buffered messages is very difficult and tricky; you may set `maxOutgoingFrames=4` and have a situation where 6 threads try to concurrently send messages: threads 1 to 4 will be able to successfully buffer their messages, thread 5 may fail, but thread 6 may succeed because one of the previous threads completed its send.
Remember that trying to control the number of outgoing frames is very difficult and tricky; you may set `maxOutgoingFrames=4` and have a situation where 6 threads try to concurrently send messages: threads 1 to 4 will be able to successfully buffer their messages, thread 5 may fail, but thread 6 may succeed because one of the previous threads completed its send.
At this point you have an out-of-order message delivery that could be unexpected and very difficult to troubleshoot because it will happen non-deterministically.
====
@ -217,9 +273,9 @@ While non-blocking APIs are more difficult to use, they don't block the sender t
If you need to send large WebSocket messages, you may reduce the memory usage by streaming the message content.
The APIs offer `sendPartial*(\...)` methods that allow you to send a chunk of the whole message at a time, therefore reducing the memory usage since it is not necessary to have the whole message `String` or `ByteBuffer` in memory to send it.
The Jetty WebSocket APIs offer `sendPartial*(\...)` methods that allow you to send a chunk of the whole message at a time, therefore reducing the memory usage since it is not necessary to have the whole message `String` or `ByteBuffer` in memory to send it.
The APIs for streaming the message content are non-blocking and therefore slightly more complicated to use, as you should wait (without blocking!) for the callbacks to complete.
The Jetty WebSocket APIs for streaming the message content are non-blocking and therefore you should wait (without blocking!) for the callbacks to complete.
Fortunately, Jetty provides the `IteratingCallback` utility class (described in more details xref:pg-arch-io-echo[in this section]) which greatly simplify the use of non-blocking APIs:
@ -227,9 +283,11 @@ Fortunately, Jetty provides the `IteratingCallback` utility class (described in
----
include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=streamSendNonBlocking]
----
<1> Implementing `Callback` allows to pass `this` to `sendPartialBinary(...)`.
<2> The `process()` method is called iteratively when each `sendPartialBinary(...)` is completed.
<1> Implementing `Callback` allows to pass `this` to `sendPartialBinary(\...)`.
<2> The `process()` method is called iteratively when each `sendPartialBinary(\...)` is completed.
<3> Sends the message chunks.
<4> When the last chunk as been sent, complete successfully the `IteratingCallback`.
<5> Only when the `IteratingCallback` is completed successfully, demand for more WebSocket events.
[[pg-websocket-session-ping]]
===== Sending Ping/Pong
@ -252,7 +310,6 @@ To handle `PING`/`PONG` events, you may implement methods `Session.Listener.onWe
----
include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=pingPongListener]
----
<1> The WebSocket endpoint class must implement `Session.Listener`
[[pg-websocket-session-close]]
===== Closing the Session

View File

@ -16,8 +16,8 @@ package org.eclipse.jetty.docs.programming;
import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.NanoTime;
@ -30,12 +30,116 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import static java.lang.System.Logger.Level.INFO;
@SuppressWarnings("unused")
public class WebSocketDocs
{
@SuppressWarnings("InnerClassMayBeStatic")
// tag::autoDemand[]
// Attribute autoDemand is true by default.
@WebSocket(autoDemand = true)
public class AutoDemandAnnotatedEndPoint
{
@OnWebSocketOpen
public void onOpen(Session session)
{
// No need to demand here, because this endpoint is auto-demanding.
}
@OnWebSocketMessage
public void onText(String message)
{
System.getLogger("ws.message").log(INFO, message);
// No need to demand here, because this endpoint is auto-demanding.
}
}
public class AutoDemandListenerEndPoint implements Session.Listener.AutoDemanding
{
private Session session;
@Override
public void onWebSocketOpen(Session session)
{
this.session = session;
// No need to demand here, because this endpoint is auto-demanding.
}
@Override
public void onWebSocketText(String message)
{
System.getLogger("ws.message").log(INFO, message);
// No need to demand here, because this endpoint is auto-demanding.
}
}
// end::autoDemand[]
@SuppressWarnings("InnerClassMayBeStatic")
// tag::autoDemandWrong[]
public class WrongAutoDemandListenerEndPoint implements Session.Listener.AutoDemanding
{
private Session session;
@Override
public void onWebSocketOpen(Session session)
{
this.session = session;
// No need to demand here, because this endpoint is auto-demanding.
}
@Override
public void onWebSocketText(String message)
{
// Perform an asynchronous operation, such as invoking
// a third party service or just echoing the message back.
session.sendText(message, Callback.NOOP);
// Returning from this method will automatically demand,
// so this method may be entered again before sendText()
// has been completed, causing a WritePendingException.
}
}
// end::autoDemandWrong[]
@SuppressWarnings("InnerClassMayBeStatic")
// tag::explicitDemand[]
public class ExplicitDemandListenerEndPoint implements Session.Listener
{
private Session session;
@Override
public void onWebSocketOpen(Session session)
{
this.session = session;
// Explicitly demand here, otherwise no other event is received.
session.demand();
}
@Override
public void onWebSocketText(String message)
{
// Perform an asynchronous operation, such as invoking
// a third party service or just echoing the message back.
// We want to demand only when sendText() has completed,
// which is notified to the callback passed to sendText().
session.sendText(message, Callback.from(session::demand, failure ->
{
// Handle the failure, in this case just closing the session.
session.close(StatusCode.SERVER_ERROR, "failure", Callback.NOOP);
}));
// Return from the method without demanding yet,
// waiting for the completion of sendText() to demand.
}
}
// end::explicitDemand[]
@SuppressWarnings("InnerClassMayBeStatic")
// tag::listenerEndpoint[]
public class ListenerEndPoint implements Session.Listener // <1>
public class ListenerEndPoint implements Session.Listener
{
private Session session;
@ -51,16 +155,53 @@ public class WebSocketDocs
session.setMaxTextMessageSize(16 * 1024);
// You may immediately send a message to the remote peer.
session.sendText("connected", Callback.NOOP);
session.sendText("connected", Callback.from(session::demand, Throwable::printStackTrace));
}
@Override
public void onWebSocketClose(int statusCode, String reason)
public void onWebSocketText(String message)
{
// The WebSocket endpoint has been closed.
// A WebSocket text message is received.
// You may dispose resources.
disposeResources();
// You may echo it back if it matches certain criteria.
if (message.startsWith("echo:"))
{
// Only demand for more events when sendText() is completed successfully.
session.sendText(message.substring("echo:".length()), Callback.from(session::demand, Throwable::printStackTrace));
}
else
{
// Discard the message, and demand for more events.
session.demand();
}
}
@Override
public void onWebSocketBinary(ByteBuffer payload, Callback callback)
{
// A WebSocket binary message is received.
// Save only PNG images.
boolean isPNG = true;
byte[] pngBytes = new byte[]{(byte)0x89, 'P', 'N', 'G'};
for (int i = 0; i < pngBytes.length; ++i)
{
if (pngBytes[i] != payload.get(i))
{
// Not a PNG image.
isPNG = false;
break;
}
}
if (isPNG)
savePNGImage(payload);
// Complete the callback to release the payload ByteBuffer.
callback.succeed();
// Demand for more events.
session.demand();
}
@Override
@ -76,29 +217,12 @@ public class WebSocketDocs
}
@Override
public void onWebSocketText(String message)
public void onWebSocketClose(int statusCode, String reason)
{
// A WebSocket textual message is received.
// The WebSocket endpoint has been closed.
// You may echo it back if it matches certain criteria.
if (message.startsWith("echo:"))
session.sendText(message.substring("echo:".length()), Callback.NOOP);
}
@Override
public void onWebSocketBinary(ByteBuffer payload, Callback callback)
{
// A WebSocket binary message is received.
// Save only PNG images.
byte[] pngBytes = new byte[]{(byte)0x89, 'P', 'N', 'G'};
for (int i = 0; i < pngBytes.length; ++i)
{
if (pngBytes[i] != payload.get(i))
return;
}
savePNGImage(payload);
callback.succeed();
// You may dispose resources.
disposeResources();
}
}
// end::listenerEndpoint[]
@ -107,13 +231,28 @@ public class WebSocketDocs
// tag::streamingListenerEndpoint[]
public class StreamingListenerEndpoint implements Session.Listener
{
private Path textPath;
private Session session;
@Override
public void onWebSocketOpen(Session session)
{
this.session = session;
session.demand();
}
@Override
public void onWebSocketPartialText(String payload, boolean fin)
{
// Forward chunks to external REST service.
forwardToREST(payload, fin);
// Forward chunks to external REST service, asynchronously.
// Only demand when the forwarding completed successfully.
CompletableFuture<Void> result = forwardToREST(payload, fin);
result.whenComplete((ignored, failure) ->
{
if (failure == null)
session.demand();
else
failure.printStackTrace();
});
}
@Override
@ -121,41 +260,77 @@ public class WebSocketDocs
{
// Save chunks to file.
appendToFile(payload, fin);
// Complete the callback.
// Complete the callback to release the payload ByteBuffer.
callback.succeed();
// Demand for more events.
session.demand();
}
}
// end::streamingListenerEndpoint[]
@SuppressWarnings("InnerClassMayBeStatic")
// tag::annotatedEndpoint[]
@WebSocket // <1>
@WebSocket(autoDemand = false) // <1>
public class AnnotatedEndPoint
{
private Session session;
@OnWebSocketOpen // <2>
public void onOpen(Session session)
{
// The WebSocket endpoint has been opened.
// Store the session to be able to send data to the remote peer.
this.session = session;
// You may configure the session.
session.setMaxTextMessageSize(16 * 1024);
// You may immediately send a message to the remote peer.
session.sendText("connected", Callback.NOOP);
session.sendText("connected", Callback.from(session::demand, Throwable::printStackTrace));
}
@OnWebSocketClose // <3>
public void onClose(int statusCode, String reason)
@OnWebSocketMessage // <3>
public void onTextMessage(Session session, String message)
{
// The WebSocket endpoint has been closed.
// A WebSocket textual message is received.
// You may dispose resources.
disposeResources();
// You may echo it back if it matches certain criteria.
if (message.startsWith("echo:"))
{
// Only demand for more events when sendText() is completed successfully.
session.sendText(message.substring("echo:".length()), Callback.from(session::demand, Throwable::printStackTrace));
}
else
{
// Discard the message, and demand for more events.
session.demand();
}
}
@OnWebSocketMessage // <3>
public void onBinaryMessage(Session session, ByteBuffer payload, Callback callback)
{
// A WebSocket binary message is received.
// Save only PNG images.
boolean isPNG = true;
byte[] pngBytes = new byte[]{(byte)0x89, 'P', 'N', 'G'};
for (int i = 0; i < pngBytes.length; ++i)
{
if (pngBytes[i] != payload.get(i))
{
// Not a PNG image.
isPNG = false;
break;
}
}
if (isPNG)
savePNGImage(payload);
// Complete the callback to release the payload ByteBuffer.
callback.succeed();
// Demand for more events.
session.demand();
}
@OnWebSocketError // <4>
@ -170,33 +345,50 @@ public class WebSocketDocs
disposeResources();
}
@OnWebSocketMessage // <5>
public void onTextMessage(Session session, String message) // <3>
@OnWebSocketClose // <5>
public void onClose(int statusCode, String reason)
{
// A WebSocket textual message is received.
// The WebSocket endpoint has been closed.
// You may echo it back if it matches certain criteria.
if (message.startsWith("echo:"))
session.sendText(message.substring("echo:".length()), Callback.NOOP);
}
@OnWebSocketMessage // <5>
public void onBinaryMessage(ByteBuffer payload, Callback callback)
{
// A WebSocket binary message is received.
// Save only PNG images.
byte[] pngBytes = new byte[]{(byte)0x89, 'P', 'N', 'G'};
for (int i = 0; i < pngBytes.length; ++i)
{
if (pngBytes[i] != payload.get(i))
return;
}
savePNGImage(payload);
// You may dispose resources.
disposeResources();
}
}
// end::annotatedEndpoint[]
@SuppressWarnings("InnerClassMayBeStatic")
// tag::partialAnnotatedEndpoint[]
@WebSocket(autoDemand = false)
public class PartialAnnotatedEndpoint
{
@OnWebSocketMessage
public void onTextMessage(Session session, String partialText, boolean fin)
{
// Forward the partial text.
// Demand only when the forward completed.
CompletableFuture<Void> result = forwardToREST(partialText, fin);
result.whenComplete((ignored, failure) ->
{
if (failure == null)
session.demand();
else
failure.printStackTrace();
});
}
@OnWebSocketMessage
public void onBinaryMessage(Session session, ByteBuffer partialPayload, boolean fin, Callback callback)
{
// Save partial payloads to file.
appendToFile(partialPayload, fin);
// Complete the callback to release the payload ByteBuffer.
callback.succeed();
// Demand for more events.
session.demand();
}
}
// end::partialAnnotatedEndpoint[]
@SuppressWarnings("InnerClassMayBeStatic")
// tag::streamingAnnotatedEndpoint[]
@WebSocket
@ -205,14 +397,16 @@ public class WebSocketDocs
@OnWebSocketMessage
public void onTextMessage(Reader reader)
{
// Read chunks and forward.
// Read from the Reader and forward.
// Caution: blocking APIs.
forwardToREST(reader);
}
@OnWebSocketMessage
public void onBinaryMessage(InputStream stream)
{
// Save chunks to file.
// Read from the InputStream and save to file.
// Caution: blocking APIs.
appendToFile(stream);
}
}
@ -230,6 +424,9 @@ public class WebSocketDocs
// Configure the idle timeout.
session.setIdleTimeout(Duration.ofSeconds(30));
// Demand for more events.
session.demand();
}
}
// end::sessionConfigure[]
@ -281,7 +478,7 @@ public class WebSocketDocs
@SuppressWarnings("InnerClassMayBeStatic")
// tag::streamSendNonBlocking[]
@WebSocket
@WebSocket(autoDemand = false)
public class StreamSendNonBlockingEndpoint
{
@OnWebSocketMessage
@ -292,6 +489,7 @@ public class WebSocketDocs
private class Sender extends IteratingCallback implements Callback // <1>
{
private final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
private final Session session;
private boolean finished;
@ -304,20 +502,20 @@ public class WebSocketDocs
protected Action process() throws Throwable // <2>
{
if (finished)
return Action.SUCCEEDED;
return Action.SUCCEEDED; // <4>
ByteBuffer chunk = readChunkToSend();
if (chunk == null)
int read = readChunkToSendInto(byteBuffer);
if (read < 0)
{
// No more bytes, finish the WebSocket message.
session.sendPartialBinary(ByteBuffer.allocate(0), true, this); // <3>
// No more bytes to send, finish the WebSocket message.
session.sendPartialBinary(byteBuffer, true, this); // <3>
finished = true;
return Action.SCHEDULED;
}
else
{
// Send the chunk.
session.sendPartialBinary(ByteBuffer.allocate(0), false, this); // <3>
session.sendPartialBinary(byteBuffer, false, this); // <3>
return Action.SCHEDULED;
}
}
@ -336,6 +534,12 @@ public class WebSocketDocs
failed(x);
}
@Override
protected void onCompleteSuccess()
{
session.demand(); // <5>
}
@Override
protected void onCompleteFailure(Throwable x)
{
@ -347,14 +551,19 @@ public class WebSocketDocs
@SuppressWarnings("InnerClassMayBeStatic")
// tag::pingPongListener[]
public class RoundTripListenerEndpoint implements Session.Listener // <1>
public class RoundTripListenerEndpoint implements Session.Listener
{
private Session session;
@Override
public void onWebSocketOpen(Session session)
{
this.session = session;
// Send to the remote peer the local nanoTime.
ByteBuffer buffer = ByteBuffer.allocate(8).putLong(NanoTime.now()).flip();
session.sendPing(buffer, Callback.NOOP);
// Demand for more events.
session.demand();
}
@Override
@ -365,6 +574,9 @@ public class WebSocketDocs
// Calculate the round-trip time.
long roundTrip = NanoTime.since(start);
// Demand for more events.
session.demand();
}
}
// end::pingPongListener[]
@ -383,8 +595,9 @@ public class WebSocketDocs
}
// end::sessionClose[]
private static void forwardToREST(String payload, boolean fin)
private static CompletableFuture<Void> forwardToREST(String payload, boolean fin)
{
return null;
}
private static void forwardToREST(Reader reader)
@ -412,8 +625,8 @@ public class WebSocketDocs
return null;
}
private static ByteBuffer readChunkToSend()
private static int readChunkToSendInto(ByteBuffer byteBuffer)
{
return null;
return 0;
}
}

View File

@ -13,14 +13,11 @@
package org.eclipse.jetty.docs.programming.server.websocket;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.websocket.DeploymentException;
import jakarta.websocket.server.ServerContainer;
import jakarta.websocket.server.ServerEndpoint;
@ -28,15 +25,18 @@ import jakarta.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.webapp.WebAppContext;
import org.eclipse.jetty.ee10.websocket.jakarta.server.config.JakartaWebSocketServletContainerInitializer;
import org.eclipse.jetty.ee10.websocket.server.JettyWebSocketCreator;
import org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServerContainer;
import org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServlet;
import org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServletFactory;
import org.eclipse.jetty.ee10.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.pathmap.PathSpec;
import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.server.ServerWebSocketContainer;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
@SuppressWarnings("unused")
public class WebSocketServerDocs
@ -162,180 +162,109 @@ public class WebSocketServerDocs
// end::standardContainerAndEndpoints[]
}
public void jettyContainerServletContextHandler() throws Exception
public void jettyContainerWithUpgradeHandler() throws Exception
{
// tag::jettyContainerServletContextHandler[]
// tag::jettyContainerWithUpgradeHandler[]
// Create a Server with a ServerConnector listening on port 8080.
Server server = new Server(8080);
// Create a ServletContextHandler with the given context path.
ServletContextHandler handler = new ServletContextHandler("/ctx");
server.setHandler(handler);
// Create a ContextHandler with the given context path.
ContextHandler contextHandler = new ContextHandler("/ctx");
server.setHandler(contextHandler);
// Ensure that JettyWebSocketServletContainerInitializer is initialized,
// to setup the JettyWebSocketServerContainer for this web application context.
JettyWebSocketServletContainerInitializer.configure(handler, null);
// Create a WebSocketUpgradeHandler that implicitly creates a ServerWebSocketContainer.
WebSocketUpgradeHandler webSocketHandler = WebSocketUpgradeHandler.from(server, contextHandler);
contextHandler.setHandler(webSocketHandler);
// Starting the Server will start the ServletContextHandler.
server.start();
// end::jettyContainerServletContextHandler[]
}
public void jettyEndpointsInitialization() throws Exception
{
// tag::jettyEndpointsInitialization[]
// Create a Server with a ServerConnector listening on port 8080.
Server server = new Server(8080);
// Create a ServletContextHandler with the given context path.
ServletContextHandler handler = new ServletContextHandler("/ctx");
server.setHandler(handler);
// Ensure that JettyWebSocketServletContainerInitializer is initialized,
// to setup the JettyWebSocketServerContainer for this web application context.
JettyWebSocketServletContainerInitializer.configure(handler, null);
// Add a WebSocket-initializer Servlet to register WebSocket endpoints.
handler.addServlet(MyJettyWebSocketInitializerServlet.class, "/*");
// Starting the Server will start the ServletContextHandler.
server.start();
// end::jettyEndpointsInitialization[]
}
@SuppressWarnings("InnerClassMayBeStatic")
// tag::jettyWebSocketInitializerServlet[]
public class MyJettyWebSocketInitializerServlet extends HttpServlet
{
@Override
public void init() throws ServletException
// Here you can access the ServerWebSocketContainer through the WebSocketUpgradeHandler APIs.
webSocketHandler.configure(container ->
{
// Retrieve the JettyWebSocketServerContainer.
JettyWebSocketServerContainer container = JettyWebSocketServerContainer.getContainer(getServletContext());
// Configure the JettyWebSocketServerContainer.
// Configure the ServerWebSocketContainer.
container.setMaxTextMessageSize(128 * 1024);
// Simple registration of your WebSocket endpoints.
container.addMapping("/ws/myURI", MyJettyWebSocketEndPoint.class);
// Map a request URI to a WebSocket endpoint, for example using a regexp.
container.addMapping("regex|/ws/v\\d+/echo", (rq, rs, cb) -> new EchoEndPoint());
// Advanced registration of your WebSocket endpoints.
container.addMapping("/ws/myOtherURI", (upgradeRequest, upgradeResponse) ->
new MyOtherJettyWebSocketEndPoint()
);
}
}
// end::jettyWebSocketInitializerServlet[]
public void jettyContainerAndEndpoints() throws Exception
{
// tag::jettyContainerAndEndpoints[]
// Create a Server with a ServerConnector listening on port 8080.
Server server = new Server(8080);
// Create a ServletContextHandler with the given context path.
ServletContextHandler handler = new ServletContextHandler("/ctx");
server.setHandler(handler);
// Setup the JettyWebSocketServerContainer and the WebSocket endpoints for this web application context.
JettyWebSocketServletContainerInitializer.configure(handler, (servletContext, container) ->
{
// Configure the ServerContainer.
container.setMaxTextMessageSize(128 * 1024);
// Add your WebSocket endpoint(s) to the JettyWebSocketServerContainer.
container.addMapping("/ws/myURI", MyJettyWebSocketEndPoint.class);
// Use JettyWebSocketCreator to have more control on the WebSocket endpoint creation.
container.addMapping("/ws/myOtherURI", (upgradeRequest, upgradeResponse) ->
// Advanced registration of a WebSocket endpoint.
container.addMapping("/ws/adv", (rq, rs, cb) ->
{
// Possibly inspect the upgrade request and modify the upgrade response.
upgradeResponse.setAcceptedSubProtocol("my-ws-protocol");
// Create the new WebSocket endpoint.
return new MyOtherJettyWebSocketEndPoint();
List<String> subProtocols = rq.getSubProtocols();
if (subProtocols.contains("my-ws-protocol"))
return new MyJettyWebSocketEndPoint();
return null;
});
});
// Starting the Server will start the ServletContextHandler.
// Starting the Server will start the ContextHandler and the WebSocketUpgradeHandler,
// which would run the configuration of the ServerWebSocketContainer.
server.start();
// end::jettyContainerAndEndpoints[]
// end::jettyContainerWithUpgradeHandler[]
}
@SuppressWarnings("InnerClassMayBeStatic")
// tag::jettyContainerUpgrade[]
public class ProgrammaticWebSocketUpgradeServlet extends HttpServlet
private static class EchoEndPoint
{
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException
{
if (requiresWebSocketUpgrade(request))
{
// Retrieve the JettyWebSocketServerContainer.
JettyWebSocketServerContainer container = JettyWebSocketServerContainer.getContainer(getServletContext());
// Use a JettyWebSocketCreator to inspect the upgrade request,
// possibly modify the upgrade response, and create the WebSocket endpoint.
JettyWebSocketCreator creator = (upgradeRequest, upgradeResponse) -> new MyJettyWebSocketEndPoint();
// Perform the direct WebSocket upgrade.
container.upgrade(creator, request, response);
}
else
{
// Normal handling of the HTTP request/response.
}
}
}
// end::jettyContainerUpgrade[]
private boolean requiresWebSocketUpgrade(HttpServletRequest request)
{
return false;
}
public void jettyWebSocketServletMain() throws Exception
public void jettyContainerWithContainer() throws Exception
{
// tag::jettyWebSocketServletMain[]
// tag::jettyContainerWithContainer[]
// Create a Server with a ServerConnector listening on port 8080.
Server server = new Server(8080);
// Create a ServletContextHandler with the given context path.
ServletContextHandler handler = new ServletContextHandler("/ctx");
server.setHandler(handler);
// Create a ContextHandler with the given context path.
ContextHandler contextHandler = new ContextHandler("/ctx");
server.setHandler(contextHandler);
// Setup the JettyWebSocketServerContainer to initialize WebSocket components.
JettyWebSocketServletContainerInitializer.configure(handler, null);
// Create a ServerWebSocketContainer, which is also stored as an attribute in the context.
ServerWebSocketContainer container = ServerWebSocketContainer.ensure(server, contextHandler);
// Add your WebSocketServlet subclass to the ServletContextHandler.
handler.addServlet(MyJettyWebSocketServlet.class, "/ws/*");
// You can use WebSocketUpgradeHandler if you want, but it is not necessary.
// You can ignore the line below, it is shown only for reference.
WebSocketUpgradeHandler webSocketHandler = new WebSocketUpgradeHandler(container);
// Starting the Server will start the ServletContextHandler.
server.start();
// end::jettyWebSocketServletMain[]
}
@SuppressWarnings("InnerClassMayBeStatic")
// tag::jettyWebSocketServlet[]
public class MyJettyWebSocketServlet extends JettyWebSocketServlet
{
@Override
protected void configure(JettyWebSocketServletFactory factory)
// You can directly use ServerWebSocketContainer from any Handler.
contextHandler.setHandler(new Handler.Abstract()
{
// At most 1 MiB text messages.
factory.setMaxTextMessageSize(1048576);
// Add the WebSocket endpoint.
factory.addMapping("/ws/someURI", (upgradeRequest, upgradeResponse) ->
@Override
public boolean handle(Request request, Response response, Callback callback)
{
// Possibly inspect the upgrade request and modify the upgrade response.
// Retrieve the ServerWebSocketContainer.
ServerWebSocketContainer container = ServerWebSocketContainer.get(request.getContext());
// Create the new WebSocket endpoint.
return new MyJettyWebSocketEndPoint();
});
}
// Verify special conditions for which a request should be upgraded to WebSocket.
String pathInContext = Request.getPathInContext(request);
if (pathInContext.startsWith("/ws/echo") && request.getHeaders().contains("X-WS", "true"))
{
try
{
// This is a WebSocket upgrade request, perform a direct upgrade.
boolean upgraded = container.upgrade((rq, rs, cb) -> new EchoEndPoint(), request, response, callback);
if (upgraded)
return true;
// This was supposed to be a WebSocket upgrade request, but something went wrong.
Response.writeError(request, response, callback, HttpStatus.UPGRADE_REQUIRED_426);
return true;
}
catch (Exception x)
{
Response.writeError(request, response, callback, HttpStatus.UPGRADE_REQUIRED_426, "failed to upgrade", x);
return true;
}
}
else
{
// Handle a normal HTTP request.
response.setStatus(HttpStatus.OK_200);
callback.succeeded();
return true;
}
}
});
// Starting the Server will start the ContextHandler.
server.start();
// end::jettyContainerWithContainer[]
}
// end::jettyWebSocketServlet[]
@ServerEndpoint("/ws")
private static class MyJavaxWebSocketEndPoint
@ -347,29 +276,29 @@ public class WebSocketServerDocs
{
}
@WebSocket
private static class MyOtherJettyWebSocketEndPoint
{
}
public void uriTemplatePathSpec()
{
// tag::uriTemplatePathSpec[]
Server server = new Server(8080);
// tag::uriTemplatePathSpec[]
ServletContextHandler handler = new ServletContextHandler("/ctx");
server.setHandler(handler);
ContextHandler contextHandler = new ContextHandler("/ctx");
server.setHandler(contextHandler);
// Configure the JettyWebSocketServerContainer.
JettyWebSocketServletContainerInitializer.configure(handler, (servletContext, container) ->
// Create a WebSocketUpgradeHandler.
WebSocketUpgradeHandler webSocketHandler = WebSocketUpgradeHandler.from(server, contextHandler);
contextHandler.setHandler(webSocketHandler);
// Here you can access the ServerWebSocketContainer through the WebSocketUpgradeHandler APIs.
webSocketHandler.configure(container ->
{
container.addMapping("/ws/chat/{room}", (upgradeRequest, upgradeResponse) ->
container.addMapping("/ws/chat/{room}", (upgradeRequest, upgradeResponse, callback) ->
{
// Retrieve the URI template.
UriTemplatePathSpec pathSpec = (UriTemplatePathSpec)upgradeRequest.getServletAttribute(PathSpec.class.getName());
UriTemplatePathSpec pathSpec = (UriTemplatePathSpec)upgradeRequest.getAttribute(PathSpec.class.getName());
// Match the URI template.
Map<String, String> params = pathSpec.getPathParams(upgradeRequest.getRequestPath());
String pathInContext = Request.getPathInContext(upgradeRequest);
Map<String, String> params = pathSpec.getPathParams(pathInContext);
String room = params.get("room");
// Create the new WebSocket endpoint with the URI template information.

View File

@ -13,13 +13,12 @@
package org.eclipse.jetty.websocket.core.server;
import java.io.IOException;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.server.internal.HandshakerSelector;
public interface Handshaker
@ -55,7 +54,7 @@ public interface Handshaker
* @param components the WebSocket components
* @param defaultCustomizer the customizer
* @return true if a response was generated, false if a response is not generated
* @throws IOException there is an error during the upgrade
* @throws WebSocketException there is an error during the upgrade
*/
boolean upgradeRequest(WebSocketNegotiator negotiator, Request request, Response response, Callback callback, WebSocketComponents components, Configuration.Customizer defaultCustomizer) throws IOException;
boolean upgradeRequest(WebSocketNegotiator negotiator, Request request, Response response, Callback callback, WebSocketComponents components, Configuration.Customizer defaultCustomizer) throws WebSocketException;
}

View File

@ -14,9 +14,8 @@
package org.eclipse.jetty.websocket.core.server;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import org.eclipse.jetty.http.pathmap.MappedResource;
import org.eclipse.jetty.http.pathmap.MatchedResource;
import org.eclipse.jetty.http.pathmap.PathMappings;
import org.eclipse.jetty.http.pathmap.PathSpec;
@ -214,22 +213,30 @@ public class WebSocketMappings implements Dumpable, LifeCycle.Listener
}
/**
* Get the matching {@link MappedResource} for the provided target.
* <p>Returns the mapped {@link WebSocketNegotiator} if there is a match of given {@code request}
* against a {@link #addMapping(PathSpec, WebSocketNegotiator) registered mapping}, otherwise
* returns {@code null} if there is no match.</p>
* <p>If there is a match, the given consumer is invoked with the {@link PathSpec} that matched
* so that, for example, it can be stored as a request attribute for later usage.
* This is important in case of {@link UriTemplatePathSpec}, where applications may want to
* extract the values of the template groups.</p>
*
* @param target the target path
* @param pathSpecConsumer the path
* @return the matching resource, or null if no match.
* @param request the request to match
* @param consumer the consumer to invoke in case of match
* @return the {@link WebSocketNegotiator} if there is a match,
* or {@code null} if there is no match
*/
public WebSocketNegotiator getMatchedNegotiator(String target, Consumer<PathSpec> pathSpecConsumer)
public WebSocketNegotiator getMatchedNegotiator(Request request, BiConsumer<Request, PathSpec> consumer)
{
MatchedResource<WebSocketNegotiator> mapping = this.mappings.getMatched(target);
String target = Request.getPathInContext(request);
MatchedResource<WebSocketNegotiator> mapping = mappings.getMatched(target);
if (mapping == null)
return null;
pathSpecConsumer.accept(mapping.getPathSpec());
consumer.accept(request, mapping.getPathSpec());
WebSocketNegotiator negotiator = mapping.getResource();
if (LOG.isDebugEnabled())
LOG.debug("WebSocket Negotiated detected on {} for endpoint {}", target, negotiator);
LOG.debug("WebSocket Negotiated detected on {} for endpoint {}", request, negotiator);
return negotiator;
}
@ -247,21 +254,21 @@ public class WebSocketMappings implements Dumpable, LifeCycle.Listener
* @param callback the callback
* @param defaultCustomizer the customizer
* @return true if the WebSocket upgrade was accepted
* @throws IOException there is an error during the upgrade
* @throws WebSocketException there is an error during the upgrade
*/
public boolean upgrade(Request request, Response response, Callback callback, Configuration.Customizer defaultCustomizer) throws IOException
public boolean upgrade(Request request, Response response, Callback callback, Configuration.Customizer defaultCustomizer) throws WebSocketException
{
String target = Request.getPathInContext(request);
WebSocketNegotiator negotiator = getMatchedNegotiator(target, pathSpec ->
{
// Store PathSpec resource mapping as request attribute,
// for WebSocketCreator implementors to use later if they wish.
request.setAttribute(PathSpec.class.getName(), pathSpec);
});
WebSocketNegotiator negotiator = getMatchedNegotiator(request, WebSocketMappings::storePathSpec);
return upgrade(negotiator, request, response, callback, defaultCustomizer);
}
private static void storePathSpec(Request request, PathSpec pathSpec)
{
// Store PathSpec resource mapping as request attribute,
// for WebSocketCreator implementors to use later if they wish.
request.setAttribute(PathSpec.class.getName(), pathSpec);
}
/**
* <p>Attempts to find a WebSocket mapping and upgrade a request to WebSocket.</p>
*
@ -276,9 +283,9 @@ public class WebSocketMappings implements Dumpable, LifeCycle.Listener
* @param callback the callback
* @param defaultCustomizer the customizer
* @return true if the WebSocket upgrade was accepted
* @throws IOException there is an error during the upgrade
* @throws WebSocketException there is an error during the upgrade
*/
public boolean upgrade(WebSocketNegotiator negotiator, Request request, Response response, Callback callback, Configuration.Customizer defaultCustomizer) throws IOException
public boolean upgrade(WebSocketNegotiator negotiator, Request request, Response response, Callback callback, Configuration.Customizer defaultCustomizer) throws WebSocketException
{
if (negotiator == null)
return false;

View File

@ -15,7 +15,6 @@ package org.eclipse.jetty.websocket.core.server;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.pathmap.PathSpec;
import org.eclipse.jetty.http.pathmap.ServletPathSpec;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
@ -49,7 +48,7 @@ public class WebSocketUpgradeHandler extends Handler.Wrapper
public void addMapping(String pathSpec, WebSocketNegotiator negotiator)
{
mappings.addMapping(new ServletPathSpec(pathSpec), negotiator);
mappings.addMapping(WebSocketMappings.parsePathSpec(pathSpec), negotiator);
}
public void addMapping(PathSpec pathSpec, WebSocketNegotiator negotiator)
@ -65,29 +64,15 @@ public class WebSocketUpgradeHandler extends Handler.Wrapper
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
String target = Request.getPathInContext(request);
WebSocketNegotiator negotiator = mappings.getMatchedNegotiator(target, pathSpec ->
{
// Store PathSpec resource mapping as request attribute,
// for WebSocketCreator implementors to use later if they wish.
request.setAttribute(PathSpec.class.getName(), pathSpec);
});
if (negotiator == null)
{
return super.handle(request, response, callback);
}
try
{
if (mappings.upgrade(negotiator, request, response, callback, customizer))
if (mappings.upgrade(request, response, callback, customizer))
return true;
return super.handle(request, response, callback);
}
catch (Throwable t)
catch (Throwable x)
{
callback.failed(t);
Response.writeError(request, response, callback, x);
return true;
}
}

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
@ -52,7 +51,7 @@ public abstract class AbstractHandshaker implements Handshaker
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, Request request, Response response, Callback callback, WebSocketComponents components, Configuration.Customizer defaultCustomizer) throws IOException
public boolean upgradeRequest(WebSocketNegotiator negotiator, Request request, Response response, Callback callback, WebSocketComponents components, Configuration.Customizer defaultCustomizer) throws WebSocketException
{
if (!isWebSocketUpgradeRequest(request))
return false;

View File

@ -13,13 +13,12 @@
package org.eclipse.jetty.websocket.core.server.internal;
import java.io.IOException;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
@ -40,7 +39,7 @@ public class HandshakerSelector implements Handshaker
}
@Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, Request request, Response response, Callback callback, WebSocketComponents components, Configuration.Customizer defaultCustomizer) throws IOException
public boolean upgradeRequest(WebSocketNegotiator negotiator, Request request, Response response, Callback callback, WebSocketComponents components, Configuration.Customizer defaultCustomizer) throws WebSocketException
{
// Try HTTP/1.1 WS upgrade, if this fails try an HTTP/2 WS upgrade if no response was committed.
return rfc6455.upgradeRequest(negotiator, request, response, callback, components, defaultCustomizer) ||

View File

@ -14,24 +14,29 @@
package org.eclipse.jetty.websocket.api;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* Callback for Write events.
* <p>
* NOTE: We don't expose org.eclipse.jetty.util.Callback here as that would complicate matters with the WebAppContext's classloader isolation.
* <p>A callback object that handles success/failure events of asynchronous operations.</p>
* <p>NOTE: This interface is almost identical to {@code org.eclipse.jetty.util.Callback},
* which however cannot be used in the Jetty WebSocket APIs for web application classloading
* reasons.</p>
*/
public interface Callback
{
/**
* <p>Empty implementation.</p>
*/
Callback NOOP = new Callback()
{
};
/**
* Creates a callback from the given success and failure lambdas.
* <p>Creates a callback from the given success and failure lambdas.</p>
*
* @param success called when the callback succeeds
* @param failure called when the callback fails
* @param success the success action to invoke when the callback succeeds
* @param failure the failure consumer to invoke when the callback fails
* @return a new callback
*/
static Callback from(Runnable success, Consumer<Throwable> failure)
@ -53,9 +58,7 @@ public interface Callback
}
/**
* <p>
* Callback invoked when the write succeeds.
* </p>
* <p>Method to invoke to succeed the callback.</p>
*
* @see #fail(Throwable)
*/
@ -64,18 +67,53 @@ public interface Callback
}
/**
* <p>
* Callback invoked when the write fails.
* </p>
* <p>Method to invoke to fail the callback.</p>
*
* @param x the reason for the write failure
* @param x the failure
*/
default void fail(Throwable x)
{
}
/**
* <p>Converts this callback into a {@link BiConsumer} that can be passed
* to {@link CompletableFuture#whenComplete(BiConsumer)}.</p>
* <p>When the {@link BiConsumer} is accepted, this callback is completed.</p>
*
* @return a {@link BiConsumer} that completes this callback
* @see Completable#with(Consumer)
*/
default BiConsumer<? super Void, ? super Throwable> asBiConsumer()
{
return (r, x) ->
{
if (x == null)
succeed();
else
fail(x);
};
}
/**
* <p>A {@link Callback} that is also a {@link CompletableFuture}.</p>
* <p>Applications can pass an instance of this class as a {@link Callback},
* but also use the {@link CompletableFuture} APIs.
*/
class Completable extends CompletableFuture<Void> implements Callback
{
/**
* <p>Convenience method that creates a {@link Completable} that
* is passed to the given consumer and then returned.</p>
* <p>For example:</p>
* <pre>{@code
* Completable.with(completable -> session.sendText("TEXT", completable))
* .thenRun(session::demand);
* }</pre>
*
* @param consumer the consumer that receives the completable
* @return a new completable passed to the consumer
* @see #compose(Consumer)
*/
public static Completable with(Consumer<Completable> consumer)
{
Completable completable = new Completable();
@ -95,10 +133,33 @@ public interface Callback
completeExceptionally(x);
}
/**
* <p>Returns a new {@link Completable} that, when this {@link Completable}
* succeeds, is passed to the given consumer and then returned.</p>
* <p>If this {@link Completable} fails, the new {@link Completable} is
* also failed.</p>
* <p>For example:</p>
* <pre>{@code
* Callback.Completable.with(completable1 -> session.sendPartialText("hello", false, completable1))
* .compose(completable2 -> session.sendPartialText(" ", false, completable2))
* .compose(completable3 -> session.sendPartialText("world", true, completable3))
* .thenRun(session::demand);
* }</pre>
*
* @param consumer the consumer that receives the {@link Completable}
* @return a new {@link Completable} passed to the consumer
* @see #with(Consumer)
*/
public Completable compose(Consumer<Completable> consumer)
{
Completable completable = new Completable();
thenAccept(ignored -> consumer.accept(completable));
whenComplete((r, x) ->
{
if (x == null)
consumer.accept(completable);
else
completable.fail(x);
});
return completable;
}
}

View File

@ -21,8 +21,12 @@ import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.eclipse.jetty.http.pathmap.PathSpec;
import org.eclipse.jetty.server.Context;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.thread.Invocable;
@ -31,10 +35,12 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketContainer;
import org.eclipse.jetty.websocket.api.WebSocketSessionListener;
import org.eclipse.jetty.websocket.common.SessionTracker;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.server.FrameHandlerFactory;
import org.eclipse.jetty.websocket.core.server.WebSocketMappings;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketServerComponents;
import org.eclipse.jetty.websocket.server.internal.ServerFrameHandlerFactory;
import org.eclipse.jetty.websocket.server.internal.ServerUpgradeRequestDelegate;
import org.eclipse.jetty.websocket.server.internal.ServerUpgradeResponseDelegate;
@ -45,11 +51,54 @@ import org.slf4j.LoggerFactory;
* <p>A server-side WebSocket container that allows to {@link #addMapping(String, WebSocketCreator) map}
* URI paths to WebSocket endpoints and configure WebSocket parameters such as idle timeouts,
* max WebSocket message sizes, etc.</p>
* <p>Direct WebSocket upgrades not mapped to URI paths are possible via
* {@link #upgrade(WebSocketCreator, Request, Response, Callback)}.</p>
*/
public class ServerWebSocketContainer extends ContainerLifeCycle implements WebSocketContainer, Configurable, Invocable
public class ServerWebSocketContainer extends ContainerLifeCycle implements WebSocketContainer, Configurable, Invocable, Request.Handler
{
private static final Logger LOG = LoggerFactory.getLogger(ServerWebSocketContainer.class);
/**
* <p>Returns the {@link ServerWebSocketContainer}, ensuring that
* it is available via {@link #get(Context)}.</p>
* <p>If the {@link ServerWebSocketContainer} is not already available,
* an instance is created, stored to be available via {@link #get(Context)}
* and returned.</p>
* <p>This method should be invoked during the setup of the
* {@link Handler} hierarchy.</p>
*
* @param server the {@link Server} object used to lookup common WebSocket components
* @param contextHandler the {@link ContextHandler} used to store the {@link ServerWebSocketContainer}
* @return a non-{@code null} {@link ServerWebSocketContainer}
*/
public static ServerWebSocketContainer ensure(Server server, ContextHandler contextHandler)
{
Context context = contextHandler.getContext();
ServerWebSocketContainer container = get(context);
if (container == null)
{
WebSocketComponents components = WebSocketServerComponents.ensureWebSocketComponents(server, contextHandler);
WebSocketMappings mappings = new WebSocketMappings(components);
container = new ServerWebSocketContainer(mappings);
context.setAttribute(WebSocketContainer.class.getName(), container);
}
return container;
}
/**
* <p>Returns the {@link ServerWebSocketContainer} present as the context attribute
* under the name corresponding to the full qualified name of class
* {@link WebSocketContainer}.</p>
*
* @param context the {@link Context} to look for the attribute
* @return the {@link ServerWebSocketContainer} stored as an attribute,
* or {@code null} if no such attribute is present
*/
public static ServerWebSocketContainer get(Context context)
{
return (ServerWebSocketContainer)context.getAttribute(WebSocketContainer.class.getName());
}
private final List<WebSocketSessionListener> listeners = new ArrayList<>();
private final SessionTracker sessionTracker = new SessionTracker();
private final Configuration configuration = new Configuration();
@ -226,46 +275,81 @@ public class ServerWebSocketContainer extends ContainerLifeCycle implements WebS
if (mappings.getWebSocketNegotiator(pathSpec) != null)
throw new WebSocketException("Duplicate WebSocket Mapping for PathSpec " + pathSpec);
org.eclipse.jetty.websocket.core.server.WebSocketCreator coreCreator = (request, response, callback) ->
var coreCreator = newWebSocketCreator(creator);
mappings.addMapping(pathSpec, coreCreator, factory, configuration);
}
/**
* <p>Matches the given {@code request} against existing WebSocket mappings,
* upgrading to WebSocket if there is a match.</p>
* <p>Direct upgrades without using WebSocket mappings may be performed via
* {@link #upgrade(WebSocketCreator, Request, Response, Callback)}.</p>
* <p>When {@code true} is returned, a response has been sent to the client
* and the {@code callback} has been completed; either because of a successful
* WebSocket upgrade, or because an error has occurred.</p>
* <p>When {@code false} is returned, a response has not been sent to the
* client, and the {@code callback} has not been completed; typically because
* the request path does not match any existing WebSocket mappings, so that
* the request can be handled by other {@link Handler}s.</p>
*
* @param request the request to handle, possibly a WebSocket upgrade request
* @param response the response to handle
* @param callback the callback to complete when the handling is complete
* @return {@code true} in case of WebSocket upgrades or failures,
* {@code false} if the request was not handled
* @throws WebSocketException there is an error during the upgrade
* @see #addMapping(PathSpec, WebSocketCreator)
* @see #upgrade(WebSocketCreator, Request, Response, Callback)
*/
@Override
public boolean handle(Request request, Response response, Callback callback) throws WebSocketException
{
return mappings.upgrade(request, response, callback, configuration);
}
/**
* <p>Upgrades the given {@code request} without matching against the WebSocket mappings.</p>
* <p>When {@code true} is returned, a response has been sent to the client
* and the {@code callback} has been completed; either because of a successful
* WebSocket upgrade, or because an error has occurred.</p>
* <p>When {@code false} is returned, a response has not been sent to the
* client, and the {@code callback} has not been completed; for example because
* the request is not a WebSocket upgrade; in this case the caller must arrange
* to send a response and complete the callback.</p>
*
* @param creator the creator of the WebSocket endpoint
* @param request the request to upgrade, possibly a WebSocket upgrade request
* @param response the response
* @param callback the callback to complete when the upgrade is complete
* @return {@code true} in case of WebSocket upgrades or failures,
* {@code false} if the request was not upgraded
* @throws WebSocketException there is an error during the upgrade
* @see #handle(Request, Response, Callback)
*/
public boolean upgrade(WebSocketCreator creator, Request request, Response response, Callback callback) throws WebSocketException
{
var coreCreator = newWebSocketCreator(creator);
WebSocketNegotiator negotiator = WebSocketNegotiator.from(coreCreator, factory);
return mappings.upgrade(negotiator, request, response, callback, configuration);
}
private org.eclipse.jetty.websocket.core.server.WebSocketCreator newWebSocketCreator(WebSocketCreator creator)
{
return (rq, rs, cb) ->
{
try
{
Object webSocket = creator.createWebSocket(new ServerUpgradeRequestDelegate(request), new ServerUpgradeResponseDelegate(request, response), callback);
Object webSocket = creator.createWebSocket(new ServerUpgradeRequestDelegate(rq), new ServerUpgradeResponseDelegate(rq, rs), cb);
if (webSocket == null)
callback.succeeded();
cb.succeeded();
return webSocket;
}
catch (Throwable x)
{
callback.failed(x);
cb.failed(x);
return null;
}
};
mappings.addMapping(pathSpec, coreCreator, factory, configuration);
}
public boolean handle(Request request, Response response, Callback callback)
{
String target = Request.getPathInContext(request);
WebSocketNegotiator negotiator = mappings.getMatchedNegotiator(target, pathSpec ->
{
// Store PathSpec resource mapping as request attribute,
// for WebSocketCreator implementors to use later if they wish.
request.setAttribute(PathSpec.class.getName(), pathSpec);
});
if (negotiator == null)
return false;
try
{
return mappings.upgrade(negotiator, request, response, callback, configuration);
}
catch (Throwable x)
{
callback.failed(x);
return true;
}
}
/**

View File

@ -95,7 +95,7 @@ public class WebSocketUpgradeHandler extends Handler.Wrapper
private final ServerWebSocketContainer _container;
private WebSocketUpgradeHandler(ServerWebSocketContainer container)
public WebSocketUpgradeHandler(ServerWebSocketContainer container)
{
_container = container;
addBean(container);
@ -117,11 +117,24 @@ public class WebSocketUpgradeHandler extends Handler.Wrapper
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
if (_container.handle(request, response, callback))
if (handle(_container, request, response, callback))
return true;
return super.handle(request, response, callback);
}
protected boolean handle(ServerWebSocketContainer container, Request request, Response response, Callback callback)
{
try
{
return container.handle(request, response, callback);
}
catch (Throwable x)
{
Response.writeError(request, response, callback, x);
return true;
}
}
@Override
public InvocationType getInvocationType()
{

View File

@ -0,0 +1,231 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.server;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.util.WSURI;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.ServerWebSocketContainer;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.tests.EchoSocket;
import org.eclipse.jetty.websocket.tests.EventSocket;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class DirectUpgradeTest
{
private Server server;
private HttpClient httpClient;
private WebSocketClient wsClient;
public void start(Function<Server, ContextHandler> factory) throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server, 1, 1);
server.addConnector(connector);
ContextHandler context = factory.apply(server);
server.setHandler(context);
server.start();
httpClient = new HttpClient(new HttpClientTransportOverHTTP(1));
wsClient = new WebSocketClient(httpClient);
wsClient.start();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(httpClient);
LifeCycle.stop(server);
}
@Test
public void testAnnotatedDirectWebSocketUpgradeInChildHandler() throws Exception
{
testDirectWebSocketUpgradeInChildHandler(EchoSocket::new);
}
@Test
public void testListenerDirectWebSocketUpgradeInChildHandler() throws Exception
{
testDirectWebSocketUpgradeInChildHandler(EchoListener::new);
}
private void testDirectWebSocketUpgradeInChildHandler(Supplier<Object> supplier) throws Exception
{
start(server ->
{
ContextHandler context = new ContextHandler("/ctx");
// Create a WebSocketUpgradeHandler with no mappings.
WebSocketUpgradeHandler wsHandler = WebSocketUpgradeHandler.from(server, context);
context.setHandler(wsHandler);
// Set up the Handler that will perform the upgrade.
wsHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
ServerWebSocketContainer container = ServerWebSocketContainer.get(request.getContext());
assertNotNull(container);
// Direct upgrade.
return container.upgrade((upgradeRequest, upgradeResponse, upgradeCallback) -> supplier.get(), request, response, callback);
}
});
return context;
});
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ctx/ws"));
EventSocket clientEndpoint = new EventSocket();
try (Session session = wsClient.connect(clientEndpoint, wsUri).get(5, TimeUnit.SECONDS))
{
String text = "ECHO";
session.sendText(text, null);
String echo = clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS);
assertEquals(text, echo);
}
}
@Test
public void testDirectWebSocketUpgradeInChildHandlerWithoutWebSocketUpgradeHandler() throws Exception
{
start(server ->
{
ContextHandler context = new ContextHandler("/ctx");
// Do not set up a WebSocketUpgradeHandler.
// Set up the Handler that will perform the upgrade.
context.setHandler(new Handler.Abstract()
{
private ServerWebSocketContainer container;
@Override
protected void doStart() throws Exception
{
super.doStart();
Server server = getServer();
assertNotNull(server);
ContextHandler contextHandler = ContextHandler.getCurrentContextHandler();
assertNotNull(contextHandler);
// Alternatively, the container can be created when the ContextHandler is created.
container = ServerWebSocketContainer.ensure(getServer(), contextHandler);
assertNotNull(container);
}
@Override
public boolean handle(Request request, Response response, Callback callback)
{
// Direct upgrade.
return container.upgrade((upgradeRequest, upgradeResponse, upgradeCallback) -> new EchoSocket(), request, response, callback);
}
});
return context;
});
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ctx/ws"));
EventSocket clientEndpoint = new EventSocket();
Session session = wsClient.connect(clientEndpoint, wsUri).get(5, TimeUnit.SECONDS);
String text = "ECHO";
session.sendText(text, null);
String echo = clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS);
assertEquals(text, echo);
}
@Test
public void testNotWebSocketUpgrade() throws Exception
{
start(server ->
{
ContextHandler context = new ContextHandler("/ctx");
ServerWebSocketContainer container = ServerWebSocketContainer.ensure(server, context);
// Allow for WebSocketUpgradeHandler to be subclassed.
WebSocketUpgradeHandler wsHandler = new WebSocketUpgradeHandler(container)
{
@Override
protected boolean handle(ServerWebSocketContainer container, Request request, Response response, Callback callback)
{
// Modify the behavior to do a direct upgrade instead of mappings upgrade.
return container.upgrade((upgradeRequest, upgradeResponse, upgradeCallback) -> new EchoSocket(), request, response, callback);
}
};
context.setHandler(wsHandler);
// Since the request is not a WebSocket upgrade, this Handler will handle it.
wsHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
Content.Sink.write(response, true, "HELLO", callback);
return true;
}
});
return context;
});
// Send a request that is not a WebSocket upgrade.
// The upgrade will not happen and the child Handler will be called.
URI uri = server.getURI().resolve("/ctx/ws");
ContentResponse response = httpClient.newRequest(uri)
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals("HELLO", response.getContentAsString());
}
private static class EchoListener implements Session.Listener
{
private Session session;
@Override
public void onWebSocketOpen(Session session)
{
this.session = session;
session.demand();
}
@Override
public void onWebSocketBinary(ByteBuffer payload, org.eclipse.jetty.websocket.api.Callback callback)
{
org.eclipse.jetty.websocket.api.Callback.Completable.with(c -> session.sendBinary(payload, c))
.whenComplete(callback.asBiConsumer())
.thenRun(session::demand);
}
@Override
public void onWebSocketText(String message)
{
session.sendText(message, org.eclipse.jetty.websocket.api.Callback.from(session::demand, Throwable::printStackTrace));
}
}
}

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.ee10.websocket.server.internal.DelegatedServerUpgradeRe
import org.eclipse.jetty.ee10.websocket.server.internal.JettyServerFrameHandlerFactory;
import org.eclipse.jetty.ee10.websocket.servlet.WebSocketUpgradeFilter;
import org.eclipse.jetty.http.pathmap.PathSpec;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.LifeCycle;
@ -46,7 +46,6 @@ import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.WebSocketCreator;
import org.eclipse.jetty.websocket.core.server.WebSocketMappings;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
@ -218,29 +217,24 @@ public class JettyWebSocketServerContainer extends ContainerLifeCycle implements
ServletContextResponse servletContextResponse = servletContextRequest.getServletContextResponse();
WebSocketNegotiator negotiator = WebSocketNegotiator.from(coreCreator, frameHandlerFactory);
Handshaker handshaker = webSocketMappings.getHandshaker();
FutureCallback callback = new FutureCallback();
try
// Set the wrapped req and resp as attributes on the ServletContext Request/Response, so they
// are accessible when websocket-core calls back the Jetty WebSocket creator.
servletContextRequest.setAttribute(WebSocketConstants.WEBSOCKET_WRAPPED_REQUEST_ATTRIBUTE, request);
servletContextRequest.setAttribute(WebSocketConstants.WEBSOCKET_WRAPPED_RESPONSE_ATTRIBUTE, response);
try (Blocker.Callback callback = Blocker.callback())
{
// Set the wrapped req and resp as attributes on the ServletContext Request/Response, so they
// are accessible when websocket-core calls back the Jetty WebSocket creator.
servletContextRequest.setAttribute(WebSocketConstants.WEBSOCKET_WRAPPED_REQUEST_ATTRIBUTE, request);
servletContextRequest.setAttribute(WebSocketConstants.WEBSOCKET_WRAPPED_RESPONSE_ATTRIBUTE, response);
if (handshaker.upgradeRequest(negotiator, servletContextRequest, servletContextResponse, callback, components, customizer))
{
boolean upgraded = webSocketMappings.upgrade(negotiator, servletContextRequest, servletContextResponse, callback, customizer);
if (upgraded)
callback.block();
return true;
}
return upgraded;
}
finally
{
servletContextRequest.removeAttribute(WebSocketConstants.WEBSOCKET_WRAPPED_REQUEST_ATTRIBUTE);
servletContextRequest.removeAttribute(WebSocketConstants.WEBSOCKET_WRAPPED_RESPONSE_ATTRIBUTE);
}
return false;
}
@Override