Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-10.0.x-4226-JavaxWebSocketJPMS

This commit is contained in:
Lachlan Roberts 2020-01-30 10:37:34 +11:00
commit 82c61c48eb
144 changed files with 2183 additions and 1004 deletions

View File

@ -16,22 +16,43 @@
// ========================================================================
//
=== Upgrading from Jetty 9.x to Jetty 10.0.x
=== Upgrading from Jetty 9.4.x to Jetty 10.0.x
The purpose of this guide is to assist users migrating from Jetty 9 to 10.
The purpose of this guide is to assist users migrating from Jetty 9.4 to 10.
It is not comprehensive, but covers many of the major changes included in the release that may prove as problem areas for users.
//TODO - Make note of any specific required Java versions.
==== Required Java Version
Jetty 10 requires, at a minimum, Java 9 to function.
Items such as the Java Platform Module System (JPMS), which Jetty 10 supports, are not available in earlier versions of Java.
==== Removed Classes
//TODO - Insert major removed/refactored classes from Jetty-9.x.x to Jetty-10.0.x
==== Changes to Websocket
//TODO - List of changes to Websocket -- Joakim/Lachlan
==== `javax.mail` and `javax.transaction`
Both `javax.mail` and `javax.transaction` have been removed from the Jetty Distribution in Jetty 10.
If you require these jars, you will need to enable the `ext` link:#startup-modules[module] and copy the files to your `$JETTY_BASE/lib/ext` directory.
==== Removed Classes
//TODO - Insert major removed/refactored classes from Jetty-9.x.x to Jetty-10.0.x
==== Module Changes in Jetty 10.0
===== New Modules in Jetty 10.0
//TODO - Insert new modules introduced in Jetty 10
===== Changes to Existing Modules in Jetty 10.0
//TODO - Insert module changes introduced in Jetty 10
==== Changes to Sessions
//TODO - List of changes to Sessions -- Jan
==== Removal of System Properties(?)
//TODO - List of removed System bits --- Greg

View File

@ -19,14 +19,20 @@
package org.eclipse.jetty.http;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
// TODO consider replacing this with java.net.HttpCookie (once it supports RFC6265)
public class HttpCookie
{
private static final Logger LOG = Log.getLogger(HttpCookie.class);
private static final String __COOKIE_DELIM = "\",;\\ \t";
private static final String __01Jan1970_COOKIE = DateGenerator.formatCookieDate(0).trim();
@ -41,6 +47,11 @@ public class HttpCookie
public static final String SAME_SITE_NONE_COMMENT = SAME_SITE_COMMENT + "NONE__";
public static final String SAME_SITE_LAX_COMMENT = SAME_SITE_COMMENT + "LAX__";
public static final String SAME_SITE_STRICT_COMMENT = SAME_SITE_COMMENT + "STRICT__";
/**
* Name of context attribute with default SameSite cookie value
*/
public static final String SAME_SITE_DEFAULT_ATTRIBUTE = "org.eclipse.jetty.cookie.sameSiteDefault";
public enum SameSite
{
@ -70,7 +81,7 @@ public class HttpCookie
private final boolean _httpOnly;
private final long _expiration;
private final SameSite _sameSite;
public HttpCookie(String name, String value)
{
this(name, value, -1);
@ -445,6 +456,42 @@ public class HttpCookie
return null;
}
/**
* Get the default value for SameSite cookie attribute, if one
* has been set for the given context.
*
* @param contextAttributes the context to check for default SameSite value
* @return the default SameSite value or null if one does not exist
* @throws IllegalStateException if the default value is not a permitted value
*/
public static SameSite getSameSiteDefault(Attributes contextAttributes)
{
if (contextAttributes == null)
return null;
Object o = contextAttributes.getAttribute(SAME_SITE_DEFAULT_ATTRIBUTE);
if (o == null)
{
if (LOG.isDebugEnabled())
LOG.debug("No default value for SameSite");
return null;
}
if (o instanceof SameSite)
return (SameSite)o;
try
{
SameSite samesite = Enum.valueOf(SameSite.class, o.toString().trim().toUpperCase(Locale.ENGLISH));
contextAttributes.setAttribute(SAME_SITE_DEFAULT_ATTRIBUTE, samesite);
return samesite;
}
catch (Exception e)
{
LOG.warn("Bad default value {} for SameSite", o);
throw new IllegalStateException(e);
}
}
public static String getCommentWithoutAttributes(String comment)
{
if (comment == null)

View File

@ -18,8 +18,19 @@
package org.eclipse.jetty.http;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.Enumeration;
import java.util.EventListener;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.eclipse.jetty.http.HttpCookie.SameSite;
import org.eclipse.jetty.util.AttributesMap;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@ -41,6 +52,32 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpCookieTest
{
@Test
public void testDefaultSameSite()
{
AttributesMap context = new AttributesMap();
//test null value for default
assertNull(HttpCookie.getSameSiteDefault(context));
//test good value for default as SameSite enum
context.setAttribute(HttpCookie.SAME_SITE_DEFAULT_ATTRIBUTE, SameSite.LAX);
assertEquals(SameSite.LAX, HttpCookie.getSameSiteDefault(context));
//test good value for default as String
context.setAttribute(HttpCookie.SAME_SITE_DEFAULT_ATTRIBUTE, "NONE");
assertEquals(SameSite.NONE, HttpCookie.getSameSiteDefault(context));
//test case for default as String
context.setAttribute(HttpCookie.SAME_SITE_DEFAULT_ATTRIBUTE, "sTrIcT");
assertEquals(SameSite.STRICT, HttpCookie.getSameSiteDefault(context));
//test bad value for default as String
context.setAttribute(HttpCookie.SAME_SITE_DEFAULT_ATTRIBUTE, "fooBAR");
assertThrows(IllegalStateException.class,
() -> HttpCookie.getSameSiteDefault(context));
}
@Test
public void testConstructFromSetCookie()
{

View File

@ -397,11 +397,21 @@ public class ForwardedRequestCustomizer implements Customizer
request.setSecure(true);
}
if (forwarded._host != null)
if (forwarded._server != null && forwarded._host instanceof PortSetHostPort)
{
httpFields.put(new HostPortHttpField(forwarded._server, forwarded._host.getPort()));
request.setAuthority(forwarded._server, forwarded._host.getPort());
}
else if (forwarded._host != null)
{
httpFields.put(new HostPortHttpField(forwarded._host));
request.setAuthority(forwarded._host.getHost(), forwarded._host.getPort());
}
else if (forwarded._server != null)
{
httpFields.put(new HostPortHttpField(forwarded._server));
request.setAuthority(forwarded._server, 0);
}
if (forwarded._for != null)
{
@ -544,6 +554,7 @@ public class ForwardedRequestCustomizer implements Customizer
String _proto;
HostPort _for;
HostPort _host;
String _server;
public Forwarded(Request request, HttpConfiguration config)
{
@ -596,7 +607,7 @@ public class ForwardedRequestCustomizer implements Customizer
{
if (getProxyAsAuthority())
return;
handleHost(field);
_server = getLeftMost(field.getValue());
}
@SuppressWarnings("unused")

View File

@ -190,13 +190,42 @@ public class Response implements HttpServletResponse
{
if (StringUtil.isBlank(cookie.getName()))
throw new IllegalArgumentException("Cookie.name cannot be blank/null");
// add the set cookie
_fields.add(new SetCookieHttpField(cookie, getHttpChannel().getHttpConfiguration().getResponseCookieCompliance()));
_fields.add(new SetCookieHttpField(checkSameSite(cookie), getHttpChannel().getHttpConfiguration().getResponseCookieCompliance()));
// Expire responses with set-cookie headers so they do not get cached.
_fields.put(__EXPIRES_01JAN1970);
}
/**
* Check that samesite is set on the cookie. If not, use a
* context default value, if one has been set.
*
* @param cookie the cookie to check
* @return either the original cookie, or a new one that has the samesit default set
*/
private HttpCookie checkSameSite(HttpCookie cookie)
{
if (cookie == null || cookie.getSameSite() != null)
return cookie;
//sameSite is not set, use the default configured for the context, if one exists
SameSite contextDefault = HttpCookie.getSameSiteDefault(_channel.getRequest().getContext());
if (contextDefault == null)
return cookie; //no default set
return new HttpCookie(cookie.getName(),
cookie.getValue(),
cookie.getDomain(),
cookie.getPath(),
cookie.getMaxAge(),
cookie.isHttpOnly(),
cookie.isSecure(),
cookie.getComment(),
cookie.getVersion(),
contextDefault);
}
@Override
public void addCookie(Cookie cookie)
@ -264,7 +293,7 @@ public class Response implements HttpServletResponse
else if (!cookie.getPath().equals(oldCookie.getPath()))
continue;
i.set(new SetCookieHttpField(cookie, compliance));
i.set(new SetCookieHttpField(checkSameSite(cookie), compliance));
return;
}
}

View File

@ -412,6 +412,34 @@ public class ForwardedRequestCustomizerTest
.requestURL("https://www.example.com:4333/")
.remoteAddr("8.5.4.3").remotePort(2222)
),
Arguments.of(new Request("X-Forwarded-* (Server before Host)")
.headers(
"GET / HTTP/1.1",
"Host: myhost",
"X-Forwarded-Proto: https",
"X-Forwarded-Server: fw.example.com",
"X-Forwarded-Host: www.example.com",
"X-Forwarded-Port: 4333",
"X-Forwarded-For: 8.5.4.3:2222"
),
new Expectations()
.scheme("https").serverName("www.example.com").serverPort(4333)
.requestURL("https://www.example.com:4333/")
.remoteAddr("8.5.4.3").remotePort(2222)
),
Arguments.of(new Request("X-Forwarded-* (Server and Port)")
.headers(
"GET / HTTP/1.1",
"Host: myhost",
"X-Forwarded-Server: fw.example.com",
"X-Forwarded-Port: 4333",
"X-Forwarded-For: 8.5.4.3:2222"
),
new Expectations()
.scheme("http").serverName("fw.example.com").serverPort(4333)
.requestURL("http://fw.example.com:4333/")
.remoteAddr("8.5.4.3").remotePort(2222)
),
// =================================================================
// Mixed Behavior

View File

@ -32,10 +32,13 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Stream;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.Cookie;
@ -1015,6 +1018,32 @@ public class ResponseTest
assertEquals("name=value; Path=/path; Domain=domain; Secure; HttpOnly", set);
}
@Test
public void testAddCookieSameSiteDefault() throws Exception
{
Response response = getResponse();
TestServletContextHandler context = new TestServletContextHandler();
_channel.getRequest().setContext(context.getServletContext());
context.setAttribute(HttpCookie.SAME_SITE_DEFAULT_ATTRIBUTE, HttpCookie.SameSite.STRICT);
Cookie cookie = new Cookie("name", "value");
cookie.setDomain("domain");
cookie.setPath("/path");
cookie.setSecure(true);
cookie.setComment("comment__HTTP_ONLY__");
response.addCookie(cookie);
String set = response.getHttpFields().get("Set-Cookie");
assertEquals("name=value; Path=/path; Domain=domain; Secure; HttpOnly; SameSite=Strict", set);
response.getHttpFields().remove("Set-Cookie");
//test bad default samesite value
context.setAttribute(HttpCookie.SAME_SITE_DEFAULT_ATTRIBUTE, "FooBar");
assertThrows(IllegalStateException.class,
() -> response.addCookie(cookie));
}
@Test
public void testAddCookieComplianceRFC2965()
@ -1154,6 +1183,23 @@ public class ResponseTest
List<String> actual = Collections.list(response.getHttpFields().getValues("Set-Cookie"));
assertThat("HttpCookie order", actual, hasItems(expected));
}
@Test
public void testReplaceHttpCookieSameSite()
{
Response response = getResponse();
TestServletContextHandler context = new TestServletContextHandler();
context.setAttribute(HttpCookie.SAME_SITE_DEFAULT_ATTRIBUTE, "LAX");
_channel.getRequest().setContext(context.getServletContext());
//replace with no prior does an add
response.replaceCookie(new HttpCookie("Foo", "123456"));
String set = response.getHttpFields().get("Set-Cookie");
assertEquals("Foo=123456; SameSite=Lax", set);
//check replacement
response.replaceCookie(new HttpCookie("Foo", "other"));
set = response.getHttpFields().get("Set-Cookie");
assertEquals("Foo=other; SameSite=Lax", set);
}
@Test
public void testReplaceParsedHttpCookie()
@ -1179,6 +1225,20 @@ public class ResponseTest
actual = Collections.list(response.getHttpFields().getValues("Set-Cookie"));
assertThat(actual, hasItems(new String[]{"Foo=replaced; Path=/path; Domain=Bah"}));
}
@Test
public void testReplaceParsedHttpCookieSiteDefault()
{
Response response = getResponse();
TestServletContextHandler context = new TestServletContextHandler();
context.setAttribute(HttpCookie.SAME_SITE_DEFAULT_ATTRIBUTE, "LAX");
_channel.getRequest().setContext(context.getServletContext());
response.addHeader(HttpHeader.SET_COOKIE.asString(), "Foo=123456");
response.replaceCookie(new HttpCookie("Foo", "value"));
String set = response.getHttpFields().get("Set-Cookie");
assertEquals("Foo=value; SameSite=Lax", set);
}
@Test
public void testFlushAfterFullContent() throws Exception
@ -1208,4 +1268,36 @@ public class ResponseTest
super(handler, new SessionData(id, "", "0.0.0.0", 0, 0, 0, 300));
}
}
private static class TestServletContextHandler extends ContextHandler
{
private class Context extends ContextHandler.Context
{
private Map<String, Object> _attributes = new HashMap<>();
@Override
public Object getAttribute(String name)
{
return _attributes.get(name);
}
@Override
public Enumeration<String> getAttributeNames()
{
return Collections.enumeration(_attributes.keySet());
}
@Override
public void setAttribute(String name, Object object)
{
_attributes.put(name,object);
}
@Override
public void removeAttribute(String name)
{
_attributes.remove(name);
}
}
}
}

View File

@ -1299,15 +1299,15 @@ public class ServletContextHandler extends ContextHandler
@Override
public boolean setInitParameter(String name, String value)
{
//since servlet spec 4.0
Objects.requireNonNull(name);
if (!isStarting())
throw new IllegalStateException();
if (!_enabled)
throw new UnsupportedOperationException();
//since servlet spec 4.0
Objects.requireNonNull(name);
return super.setInitParameter(name, value);
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -138,6 +139,40 @@ public class FutureCallback implements Future<Void>, Callback
throw new ExecutionException(_cause);
}
public void block() throws IOException
{
block(-1, TimeUnit.SECONDS);
}
public void block(long timeout, TimeUnit unit) throws IOException
{
try
{
if (timeout > 0)
get(timeout, unit);
else
get();
}
catch (InterruptedException e)
{
InterruptedIOException exception = new InterruptedIOException();
exception.initCause(e);
throw exception;
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause instanceof RuntimeException)
throw new RuntimeException(cause);
else
throw new IOException(cause);
}
catch (TimeoutException e)
{
throw new IOException(e);
}
}
public static void rethrow(ExecutionException e) throws IOException
{
Throwable cause = e.getCause();

View File

@ -27,6 +27,7 @@ module org.eclipse.jetty.websocket.core
exports org.eclipse.jetty.websocket.core;
exports org.eclipse.jetty.websocket.core.client;
exports org.eclipse.jetty.websocket.core.server;
exports org.eclipse.jetty.websocket.core.exception;
exports org.eclipse.jetty.websocket.core.internal to org.eclipse.jetty.util;
requires jetty.servlet.api;

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.util.compression.DeflaterPool;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
@ManagedObject("Abstract Extension")
public class AbstractExtension implements Extension
@ -36,7 +35,7 @@ public class AbstractExtension implements Extension
private ExtensionConfig config;
private OutgoingFrames nextOutgoing;
private IncomingFrames nextIncoming;
private WebSocketCoreSession coreSession;
private Configuration configuration;
private DeflaterPool deflaterPool;
private InflaterPool inflaterPool;
@ -169,14 +168,14 @@ public class AbstractExtension implements Extension
}
@Override
public void setWebSocketCoreSession(WebSocketCoreSession coreSession)
public void setCoreSession(CoreSession coreSession)
{
this.coreSession = coreSession;
this.configuration = coreSession;
}
protected WebSocketCoreSession getWebSocketCoreSession()
protected Configuration getConfiguration()
{
return coreSession;
return configuration;
}
@Override

View File

@ -25,6 +25,9 @@ import java.util.Arrays;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.core.exception.BadPayloadException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.internal.NullAppendable;
/**
* Representation of a WebSocket Close (status code &amp; reason)

View File

@ -0,0 +1,210 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.time.Duration;
public interface Configuration
{
/**
* Get the Idle Timeout
*
* @return the idle timeout
*/
Duration getIdleTimeout();
/**
* Get the Write Timeout
*
* @return the write timeout
*/
Duration getWriteTimeout();
/**
* Set the Idle Timeout.
*
* @param timeout the timeout duration (timeout &lt;= 0 implies an infinite timeout)
*/
void setIdleTimeout(Duration timeout);
/**
* Set the Write Timeout.
*
* @param timeout the timeout duration (timeout &lt;= 0 implies an infinite timeout)
*/
void setWriteTimeout(Duration timeout);
boolean isAutoFragment();
void setAutoFragment(boolean autoFragment);
long getMaxFrameSize();
void setMaxFrameSize(long maxFrameSize);
int getOutputBufferSize();
void setOutputBufferSize(int outputBufferSize);
int getInputBufferSize();
void setInputBufferSize(int inputBufferSize);
long getMaxBinaryMessageSize();
void setMaxBinaryMessageSize(long maxSize);
long getMaxTextMessageSize();
void setMaxTextMessageSize(long maxSize);
interface Customizer
{
void customize(Configuration configurable);
}
class ConfigurationCustomizer implements Configuration, Customizer
{
private Duration idleTimeout;
private Duration writeTimeout;
private Boolean autoFragment;
private Long maxFrameSize;
private Integer outputBufferSize;
private Integer inputBufferSize;
private Long maxBinaryMessageSize;
private Long maxTextMessageSize;
@Override
public Duration getIdleTimeout()
{
return idleTimeout == null ? WebSocketConstants.DEFAULT_IDLE_TIMEOUT : idleTimeout;
}
@Override
public Duration getWriteTimeout()
{
return writeTimeout == null ? WebSocketConstants.DEFAULT_WRITE_TIMEOUT : writeTimeout;
}
@Override
public void setIdleTimeout(Duration timeout)
{
this.idleTimeout = timeout;
}
@Override
public void setWriteTimeout(Duration timeout)
{
this.writeTimeout = timeout;
}
@Override
public boolean isAutoFragment()
{
return autoFragment == null ? WebSocketConstants.DEFAULT_AUTO_FRAGMENT : autoFragment;
}
@Override
public void setAutoFragment(boolean autoFragment)
{
this.autoFragment = autoFragment;
}
@Override
public long getMaxFrameSize()
{
return maxFrameSize == null ? WebSocketConstants.DEFAULT_MAX_FRAME_SIZE : maxFrameSize;
}
@Override
public void setMaxFrameSize(long maxFrameSize)
{
this.maxFrameSize = maxFrameSize;
}
@Override
public int getOutputBufferSize()
{
return outputBufferSize == null ? WebSocketConstants.DEFAULT_OUTPUT_BUFFER_SIZE : outputBufferSize;
}
@Override
public void setOutputBufferSize(int outputBufferSize)
{
this.outputBufferSize = outputBufferSize;
}
@Override
public int getInputBufferSize()
{
return inputBufferSize == null ? WebSocketConstants.DEFAULT_INPUT_BUFFER_SIZE : inputBufferSize;
}
@Override
public void setInputBufferSize(int inputBufferSize)
{
this.inputBufferSize = inputBufferSize;
}
@Override
public long getMaxBinaryMessageSize()
{
return maxBinaryMessageSize == null ? WebSocketConstants.DEFAULT_MAX_BINARY_MESSAGE_SIZE : maxBinaryMessageSize;
}
@Override
public void setMaxBinaryMessageSize(long maxBinaryMessageSize)
{
this.maxBinaryMessageSize = maxBinaryMessageSize;
}
@Override
public long getMaxTextMessageSize()
{
return maxTextMessageSize == null ? WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE : maxTextMessageSize;
}
@Override
public void setMaxTextMessageSize(long maxTextMessageSize)
{
this.maxTextMessageSize = maxTextMessageSize;
}
@Override
public void customize(Configuration configurable)
{
if (idleTimeout != null)
configurable.setIdleTimeout(idleTimeout);
if (writeTimeout != null)
configurable.setWriteTimeout(writeTimeout);
if (autoFragment != null)
configurable.setAutoFragment(autoFragment);
if (maxFrameSize != null)
configurable.setMaxFrameSize(maxFrameSize);
if (inputBufferSize != null)
configurable.setInputBufferSize(inputBufferSize);
if (outputBufferSize != null)
configurable.setOutputBufferSize(outputBufferSize);
if (maxBinaryMessageSize != null)
configurable.setMaxBinaryMessageSize(maxBinaryMessageSize);
if (maxTextMessageSize != null)
configurable.setMaxTextMessageSize(maxTextMessageSize);
}
}
}

View File

@ -0,0 +1,270 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
/**
* Represents the outgoing Frames.
*/
public interface CoreSession extends OutgoingFrames, Configuration
{
/**
* The negotiated WebSocket Sub-Protocol for this session.
*
* @return the negotiated WebSocket Sub-Protocol for this session.
*/
String getNegotiatedSubProtocol();
/**
* The negotiated WebSocket Extension Configurations for this session.
*
* @return the list of Negotiated Extension Configurations for this session.
*/
List<ExtensionConfig> getNegotiatedExtensions();
/**
* The parameter map (from URI Query) for the active session.
*
* @return the immutable map of parameters
*/
Map<String, List<String>> getParameterMap();
/**
* The active {@code Sec-WebSocket-Version} (protocol version) in use.
*
* @return the protocol version in use.
*/
String getProtocolVersion();
/**
* The active connection's Request URI.
* This is the URI of the upgrade request and is typically http: or https: rather than
* the ws: or wss: scheme.
*
* @return the absolute URI (including Query string)
*/
URI getRequestURI();
/**
* The active connection's Secure status indicator.
*
* @return true if connection is secure (similar in role to {@code HttpServletRequest.isSecure()})
*/
boolean isSecure();
/**
* @return Client or Server behaviour
*/
Behavior getBehavior();
/**
* @return The shared ByteBufferPool
*/
ByteBufferPool getByteBufferPool();
/**
* The Local Socket Address for the connection
* <p>
* Do not assume that this will return a {@link InetSocketAddress} in all cases.
* Use of various proxies, and even UnixSockets can result a SocketAddress being returned
* without supporting {@link InetSocketAddress}
* </p>
*
* @return the SocketAddress for the local connection, or null if not supported by Session
*/
SocketAddress getLocalAddress();
/**
* The Remote Socket Address for the connection
* <p>
* Do not assume that this will return a {@link InetSocketAddress} in all cases.
* Use of various proxies, and even UnixSockets can result a SocketAddress being returned
* without supporting {@link InetSocketAddress}
* </p>
*
* @return the SocketAddress for the remote connection, or null if not supported by Session
*/
SocketAddress getRemoteAddress();
/**
* @return True if the websocket is open outbound
*/
boolean isOutputOpen();
/**
* If using BatchMode.ON or BatchMode.AUTO, trigger a flush of enqueued / batched frames.
*
* @param callback the callback to track close frame sent (or failed)
*/
void flush(Callback callback);
/**
* Initiate close handshake, no payload (no declared status code or reason phrase)
*
* @param callback the callback to track close frame sent (or failed)
*/
void close(Callback callback);
/**
* Initiate close handshake with provide status code and optional reason phrase.
*
* @param statusCode the status code (should be a valid status code that can be sent)
* @param reason optional reason phrase (will be truncated automatically by implementation to fit within limits of protocol)
* @param callback the callback to track close frame sent (or failed)
*/
void close(int statusCode, String reason, Callback callback);
/**
* Issue a harsh abort of the underlying connection.
* <p>
* This will terminate the connection, without sending a websocket close frame.
* No WebSocket Protocol close handshake will be performed.
* </p>
* <p>
* Once called, any read/write activity on the websocket from this point will be indeterminate.
* This can result in the {@link FrameHandler#onError(Throwable, Callback)} event being called indicating any issue that arises.
* </p>
* <p>
* Once the underlying connection has been determined to be closed, the {@link FrameHandler#onClosed(CloseStatus, Callback)} event will be called.
* </p>
*/
void abort();
/**
* Manage flow control by indicating demand for handling Frames. A call to
* {@link FrameHandler#onFrame(Frame, Callback)} will only be made if a
* corresponding demand has been signaled. It is an error to call this method
* if {@link FrameHandler#isDemanding()} returns false.
*
* @param n The number of frames that can be handled (in sequential calls to
* {@link FrameHandler#onFrame(Frame, Callback)}). May not be negative.
*/
void demand(long n);
class Empty extends ConfigurationCustomizer implements CoreSession
{
@Override
public String getNegotiatedSubProtocol()
{
return null;
}
@Override
public List<ExtensionConfig> getNegotiatedExtensions()
{
return null;
}
@Override
public Map<String, List<String>> getParameterMap()
{
return null;
}
@Override
public String getProtocolVersion()
{
return null;
}
@Override
public URI getRequestURI()
{
return null;
}
@Override
public boolean isSecure()
{
return false;
}
@Override
public void abort()
{
}
@Override
public Behavior getBehavior()
{
return null;
}
@Override
public ByteBufferPool getByteBufferPool()
{
return null;
}
@Override
public SocketAddress getLocalAddress()
{
return null;
}
@Override
public SocketAddress getRemoteAddress()
{
return null;
}
@Override
public boolean isOutputOpen()
{
return false;
}
@Override
public void flush(Callback callback)
{
callback.succeeded();
}
@Override
public void close(Callback callback)
{
callback.succeeded();
}
@Override
public void close(int statusCode, String reason, Callback callback)
{
callback.succeeded();
}
@Override
public void demand(long n)
{
}
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
callback.succeeded();
}
}
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
/**
* Interface for WebSocket Extensions.
* <p>
@ -88,7 +86,8 @@ public interface Extension extends IncomingFrames, OutgoingFrames
void setNextOutgoingFrames(OutgoingFrames nextOutgoing);
/**
* Set the {@link WebSocketCoreSession} for this Extension
* Set the {@link CoreSession} for this Extension.
* @param coreSession
*/
void setWebSocketCoreSession(WebSocketCoreSession coreSession);
void setCoreSession(CoreSession coreSession);
}

View File

@ -18,14 +18,6 @@
package org.eclipse.jetty.websocket.core;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.server.Negotiation;
@ -129,443 +121,4 @@ public interface FrameHandler extends IncomingFrames
{
return false;
}
interface Configuration
{
/**
* Get the Idle Timeout
*
* @return the idle timeout
*/
Duration getIdleTimeout();
/**
* Get the Write Timeout
*
* @return the write timeout
*/
Duration getWriteTimeout();
/**
* Set the Idle Timeout.
*
* @param timeout the timeout duration (timeout &lt;= 0 implies an infinite timeout)
*/
void setIdleTimeout(Duration timeout);
/**
* Set the Write Timeout.
*
* @param timeout the timeout duration (timeout &lt;= 0 implies an infinite timeout)
*/
void setWriteTimeout(Duration timeout);
boolean isAutoFragment();
void setAutoFragment(boolean autoFragment);
long getMaxFrameSize();
void setMaxFrameSize(long maxFrameSize);
int getOutputBufferSize();
void setOutputBufferSize(int outputBufferSize);
int getInputBufferSize();
void setInputBufferSize(int inputBufferSize);
long getMaxBinaryMessageSize();
void setMaxBinaryMessageSize(long maxSize);
long getMaxTextMessageSize();
void setMaxTextMessageSize(long maxSize);
}
/**
* Represents the outgoing Frames.
*/
interface CoreSession extends OutgoingFrames, Configuration
{
/**
* The negotiated WebSocket Sub-Protocol for this session.
*
* @return the negotiated WebSocket Sub-Protocol for this session.
*/
String getNegotiatedSubProtocol();
/**
* The negotiated WebSocket Extension Configurations for this session.
*
* @return the list of Negotiated Extension Configurations for this session.
*/
List<ExtensionConfig> getNegotiatedExtensions();
/**
* The parameter map (from URI Query) for the active session.
*
* @return the immutable map of parameters
*/
Map<String, List<String>> getParameterMap();
/**
* The active {@code Sec-WebSocket-Version} (protocol version) in use.
*
* @return the protocol version in use.
*/
String getProtocolVersion();
/**
* The active connection's Request URI.
* This is the URI of the upgrade request and is typically http: or https: rather than
* the ws: or wss: scheme.
*
* @return the absolute URI (including Query string)
*/
URI getRequestURI();
/**
* The active connection's Secure status indicator.
*
* @return true if connection is secure (similar in role to {@code HttpServletRequest.isSecure()})
*/
boolean isSecure();
/**
* @return Client or Server behaviour
*/
Behavior getBehavior();
/**
* @return The shared ByteBufferPool
*/
ByteBufferPool getByteBufferPool();
/**
* The Local Socket Address for the connection
* <p>
* Do not assume that this will return a {@link InetSocketAddress} in all cases.
* Use of various proxies, and even UnixSockets can result a SocketAddress being returned
* without supporting {@link InetSocketAddress}
* </p>
*
* @return the SocketAddress for the local connection, or null if not supported by Session
*/
SocketAddress getLocalAddress();
/**
* The Remote Socket Address for the connection
* <p>
* Do not assume that this will return a {@link InetSocketAddress} in all cases.
* Use of various proxies, and even UnixSockets can result a SocketAddress being returned
* without supporting {@link InetSocketAddress}
* </p>
*
* @return the SocketAddress for the remote connection, or null if not supported by Session
*/
SocketAddress getRemoteAddress();
/**
* @return True if the websocket is open outbound
*/
boolean isOutputOpen();
/**
* If using BatchMode.ON or BatchMode.AUTO, trigger a flush of enqueued / batched frames.
*
* @param callback the callback to track close frame sent (or failed)
*/
void flush(Callback callback);
/**
* Initiate close handshake, no payload (no declared status code or reason phrase)
*
* @param callback the callback to track close frame sent (or failed)
*/
void close(Callback callback);
/**
* Initiate close handshake with provide status code and optional reason phrase.
*
* @param statusCode the status code (should be a valid status code that can be sent)
* @param reason optional reason phrase (will be truncated automatically by implementation to fit within limits of protocol)
* @param callback the callback to track close frame sent (or failed)
*/
void close(int statusCode, String reason, Callback callback);
/**
* Issue a harsh abort of the underlying connection.
* <p>
* This will terminate the connection, without sending a websocket close frame.
* No WebSocket Protocol close handshake will be performed.
* </p>
* <p>
* Once called, any read/write activity on the websocket from this point will be indeterminate.
* This can result in the {@link #onError(Throwable, Callback)} event being called indicating any issue that arises.
* </p>
* <p>
* Once the underlying connection has been determined to be closed, the {@link #onClosed(CloseStatus, Callback)} event will be called.
* </p>
*/
void abort();
/**
* Manage flow control by indicating demand for handling Frames. A call to
* {@link FrameHandler#onFrame(Frame, Callback)} will only be made if a
* corresponding demand has been signaled. It is an error to call this method
* if {@link FrameHandler#isDemanding()} returns false.
*
* @param n The number of frames that can be handled (in sequential calls to
* {@link FrameHandler#onFrame(Frame, Callback)}). May not be negative.
*/
void demand(long n);
class Empty extends ConfigurationCustomizer implements CoreSession
{
@Override
public String getNegotiatedSubProtocol()
{
return null;
}
@Override
public List<ExtensionConfig> getNegotiatedExtensions()
{
return null;
}
@Override
public Map<String, List<String>> getParameterMap()
{
return null;
}
@Override
public String getProtocolVersion()
{
return null;
}
@Override
public URI getRequestURI()
{
return null;
}
@Override
public boolean isSecure()
{
return false;
}
@Override
public void abort()
{
}
@Override
public Behavior getBehavior()
{
return null;
}
@Override
public ByteBufferPool getByteBufferPool()
{
return null;
}
@Override
public SocketAddress getLocalAddress()
{
return null;
}
@Override
public SocketAddress getRemoteAddress()
{
return null;
}
@Override
public boolean isOutputOpen()
{
return false;
}
@Override
public void flush(Callback callback)
{
}
@Override
public void close(Callback callback)
{
}
@Override
public void close(int statusCode, String reason, Callback callback)
{
}
@Override
public void demand(long n)
{
}
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
}
}
}
interface Customizer
{
void customize(Configuration configurable);
}
class ConfigurationHolder implements Configuration
{
protected Duration idleTimeout;
protected Duration writeTimeout;
protected Boolean autoFragment;
protected Long maxFrameSize;
protected Integer outputBufferSize;
protected Integer inputBufferSize;
protected Long maxBinaryMessageSize;
protected Long maxTextMessageSize;
@Override
public Duration getIdleTimeout()
{
return idleTimeout == null ? WebSocketConstants.DEFAULT_IDLE_TIMEOUT : idleTimeout;
}
@Override
public Duration getWriteTimeout()
{
return writeTimeout == null ? WebSocketConstants.DEFAULT_WRITE_TIMEOUT : writeTimeout;
}
@Override
public void setIdleTimeout(Duration timeout)
{
this.idleTimeout = timeout;
}
@Override
public void setWriteTimeout(Duration timeout)
{
this.writeTimeout = timeout;
}
@Override
public boolean isAutoFragment()
{
return autoFragment == null ? WebSocketConstants.DEFAULT_AUTO_FRAGMENT : autoFragment;
}
@Override
public void setAutoFragment(boolean autoFragment)
{
this.autoFragment = autoFragment;
}
@Override
public long getMaxFrameSize()
{
return maxFrameSize == null ? WebSocketConstants.DEFAULT_MAX_FRAME_SIZE : maxFrameSize;
}
@Override
public void setMaxFrameSize(long maxFrameSize)
{
this.maxFrameSize = maxFrameSize;
}
@Override
public int getOutputBufferSize()
{
return outputBufferSize == null ? WebSocketConstants.DEFAULT_OUTPUT_BUFFER_SIZE : outputBufferSize;
}
@Override
public void setOutputBufferSize(int outputBufferSize)
{
this.outputBufferSize = outputBufferSize;
}
@Override
public int getInputBufferSize()
{
return inputBufferSize == null ? WebSocketConstants.DEFAULT_INPUT_BUFFER_SIZE : inputBufferSize;
}
@Override
public void setInputBufferSize(int inputBufferSize)
{
this.inputBufferSize = inputBufferSize;
}
@Override
public long getMaxBinaryMessageSize()
{
return maxBinaryMessageSize == null ? WebSocketConstants.DEFAULT_MAX_BINARY_MESSAGE_SIZE : maxBinaryMessageSize;
}
@Override
public void setMaxBinaryMessageSize(long maxBinaryMessageSize)
{
this.maxBinaryMessageSize = maxBinaryMessageSize;
}
@Override
public long getMaxTextMessageSize()
{
return maxTextMessageSize == null ? WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE : maxTextMessageSize;
}
@Override
public void setMaxTextMessageSize(long maxTextMessageSize)
{
this.maxTextMessageSize = maxTextMessageSize;
}
}
class ConfigurationCustomizer extends ConfigurationHolder implements Customizer
{
@Override
public void customize(Configuration configurable)
{
if (idleTimeout != null)
configurable.setIdleTimeout(idleTimeout);
if (writeTimeout != null)
configurable.setWriteTimeout(writeTimeout);
if (autoFragment != null)
configurable.setAutoFragment(autoFragment);
if (maxFrameSize != null)
configurable.setMaxFrameSize(maxFrameSize);
if (inputBufferSize != null)
configurable.setInputBufferSize(inputBufferSize);
if (outputBufferSize != null)
configurable.setOutputBufferSize(outputBufferSize);
if (maxBinaryMessageSize != null)
configurable.setMaxBinaryMessageSize(maxBinaryMessageSize);
if (maxTextMessageSize != null)
configurable.setMaxTextMessageSize(maxTextMessageSize);
}
public static ConfigurationCustomizer from(ConfigurationCustomizer parent, ConfigurationCustomizer child)
{
ConfigurationCustomizer customizer = new ConfigurationCustomizer();
parent.customize(customizer);
child.customize(customizer);
return customizer;
}
}
}

View File

@ -29,13 +29,15 @@ import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.exception.BadPayloadException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
/**
* A utility implementation of FrameHandler that defragments
* text frames into a String message before calling {@link #onText(String, Callback)}.
* Flow control is by default automatic, but an implementation
* may extend {@link #isDemanding()} to return true and then explicityly control
* demand with calls to {@link org.eclipse.jetty.websocket.core.FrameHandler.CoreSession#demand(long)}
* demand with calls to {@link CoreSession#demand(long)}
*/
public class MessageHandler implements FrameHandler
{

View File

@ -26,7 +26,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -44,20 +43,20 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.UpgradeException;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.UpgradeException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
@ -78,10 +77,10 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
}
private static final Logger LOG = Log.getLogger(ClientUpgradeRequest.class);
protected final CompletableFuture<FrameHandler.CoreSession> futureCoreSession;
protected final CompletableFuture<CoreSession> futureCoreSession;
private final WebSocketCoreClient wsClient;
private FrameHandler frameHandler;
private FrameHandler.ConfigurationCustomizer customizer = new FrameHandler.ConfigurationCustomizer();
private Configuration.ConfigurationCustomizer customizer = new Configuration.ConfigurationCustomizer();
private List<UpgradeListener> upgradeListeners = new ArrayList<>();
private List<ExtensionConfig> requestedExtensions = new ArrayList<>();
@ -115,7 +114,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
this.futureCoreSession = new CompletableFuture<>();
}
public void setConfiguration(FrameHandler.ConfigurationCustomizer config)
public void setConfiguration(Configuration.ConfigurationCustomizer config)
{
config.customize(customizer);
}
@ -190,7 +189,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
super.send(listener);
}
public CompletableFuture<FrameHandler.CoreSession> sendAsync()
public CompletableFuture<CoreSession> sendAsync()
{
send(this);
return futureCoreSession;
@ -272,16 +271,6 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
{
}
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
{
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession);
}
protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Negotiated negotiated)
{
return new WebSocketCoreSession(handler, Behavior.CLIENT, negotiated);
}
public abstract FrameHandler getFrameHandler();
void requestComplete()
@ -436,11 +425,11 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
extensionStack,
WebSocketConstants.SPEC_VERSION_STRING);
WebSocketCoreSession coreSession = newWebSocketCoreSession(frameHandler, negotiated);
WebSocketCoreSession coreSession = new WebSocketCoreSession(frameHandler, Behavior.CLIENT, negotiated);
customizer.customize(coreSession);
HttpClient httpClient = wsClient.getHttpClient();
WebSocketConnection wsConnection = newWebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), httpClient.getByteBufferPool(), coreSession);
WebSocketConnection wsConnection = new WebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), httpClient.getByteBufferPool(), coreSession);
wsClient.getEventListeners().forEach(wsConnection::addEventListener);
coreSession.setWebSocketConnection(wsConnection);
notifyUpgradeListeners((listener) -> listener.onHandshakeResponse(this, response));

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
@ -68,13 +69,13 @@ public class WebSocketCoreClient extends ContainerLifeCycle
addBean(httpClient);
}
public CompletableFuture<FrameHandler.CoreSession> connect(FrameHandler frameHandler, URI wsUri) throws IOException
public CompletableFuture<CoreSession> connect(FrameHandler frameHandler, URI wsUri) throws IOException
{
ClientUpgradeRequest request = ClientUpgradeRequest.from(this, wsUri, frameHandler);
return connect(request);
}
public CompletableFuture<FrameHandler.CoreSession> connect(ClientUpgradeRequest request) throws IOException
public CompletableFuture<CoreSession> connect(ClientUpgradeRequest request) throws IOException
{
if (!isStarted())
throw new IllegalStateException(WebSocketCoreClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");

View File

@ -16,7 +16,9 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.core.exception;
import org.eclipse.jetty.websocket.core.CloseStatus;
/**
* Exception to terminate the connection because it has received data within a frame payload that was not consistent with the requirements of that frame

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.core.exception;
@SuppressWarnings("serial")
public class CloseException extends WebSocketException

View File

@ -16,7 +16,9 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.core.exception;
import org.eclipse.jetty.websocket.core.CloseStatus;
/**
* Exception when a message is too large for the internal buffers occurs and should trigger a connection close.

View File

@ -16,7 +16,9 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.core.exception;
import org.eclipse.jetty.websocket.core.CloseStatus;
/**
* Per spec, a protocol error should result in a Close frame of status code 1002 (PROTOCOL_ERROR)

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.core.exception;
import java.net.URI;

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.core.exception;
/**
* A recoverable exception within the websocket framework.

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.core.exception;
/**
* Exception thrown to indicate a connection I/O timeout.

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.core.exception;
public class WebSocketWriteTimeoutException extends WebSocketTimeoutException
{

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
/**
* Represents the stack of Extensions.
@ -254,7 +254,7 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
for (Extension extension : extensions)
{
extension.setWebSocketCoreSession(coreSession);
extension.setCoreSession(coreSession);
}
}

View File

@ -22,9 +22,9 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.AbstractExtension;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
/**
@ -35,7 +35,7 @@ public class FragmentExtension extends AbstractExtension
private static final Logger LOG = Log.getLogger(FragmentExtension.class);
private final FragmentingFlusher flusher;
private final FrameHandler.Configuration configuration = new FrameHandler.ConfigurationHolder();
private final Configuration configuration = new Configuration.ConfigurationCustomizer();
public FragmentExtension()
{

View File

@ -23,8 +23,8 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler.Configuration;
import org.eclipse.jetty.websocket.core.OpCode;
/**

View File

@ -41,8 +41,8 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.WebSocketWriteTimeoutException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.WebSocketWriteTimeoutException;
public class FrameFlusher extends IteratingCallback
{

View File

@ -19,7 +19,7 @@
package org.eclipse.jetty.websocket.core.internal;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
public class FrameSequence
{

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
package org.eclipse.jetty.websocket.core.internal;
import org.eclipse.jetty.util.Utf8Appendable;

View File

@ -27,13 +27,12 @@ import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler.Configuration;
import org.eclipse.jetty.websocket.core.FrameHandler.ConfigurationHolder;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
/**
* Parsing of a frames in WebSocket land.
@ -65,7 +64,7 @@ public class Parser
public Parser(ByteBufferPool bufferPool)
{
this(bufferPool, new ConfigurationHolder());
this(bufferPool, new Configuration.ConfigurationCustomizer());
}
public Parser(ByteBufferPool bufferPool, Configuration configuration)

View File

@ -30,13 +30,13 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.AbstractExtension;
import org.eclipse.jetty.websocket.core.BadPayloadException;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.exception.BadPayloadException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
/**
* Per Message Deflate Compression extension for WebSocket.
@ -269,7 +269,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
private boolean deflate(Callback callback)
{
// Get a buffer for the inflated payload.
long maxFrameSize = getWebSocketCoreSession().getMaxFrameSize();
long maxFrameSize = getConfiguration().getMaxFrameSize();
int bufferSize = (maxFrameSize <= 0) ? deflateBufferSize : (int)Math.min(maxFrameSize, deflateBufferSize);
final ByteBuffer buffer = getBufferPool().acquire(bufferSize, false);
callback = Callback.from(callback, () -> getBufferPool().release(buffer));
@ -289,7 +289,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
if (buffer.limit() == bufferSize)
{
// We need to fragment. TODO: what if there was only bufferSize of content?
if (!getWebSocketCoreSession().isAutoFragment())
if (!getConfiguration().isAutoFragment())
throw new MessageTooLargeException("Deflated payload exceeded the compress buffer size");
break;
}
@ -402,7 +402,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
private boolean inflate(Callback callback) throws DataFormatException
{
// Get a buffer for the inflated payload.
long maxFrameSize = getWebSocketCoreSession().getMaxFrameSize();
long maxFrameSize = getConfiguration().getMaxFrameSize();
int bufferSize = (maxFrameSize <= 0) ? inflateBufferSize : (int)Math.min(maxFrameSize, inflateBufferSize);
final ByteBuffer payload = getBufferPool().acquire(bufferSize, false);
callback = Callback.from(callback, () -> getBufferPool().release(payload));
@ -421,7 +421,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
if (payload.limit() == bufferSize)
{
// We need to fragment. TODO: what if there was only bufferSize of content?
if (!getWebSocketCoreSession().isAutoFragment())
if (!getConfiguration().isAutoFragment())
throw new MessageTooLargeException("Inflated payload exceeded the decompress buffer size");
break;
}

View File

@ -24,11 +24,11 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.AbstractExtension;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.NullAppendable;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import static org.eclipse.jetty.websocket.core.OpCode.CONTINUATION;
import static org.eclipse.jetty.websocket.core.OpCode.TEXT;
@ -38,6 +38,7 @@ public class ValidationExtension extends AbstractExtension
{
private static final Logger LOG = Log.getLogger(ValidationExtension.class);
private WebSocketCoreSession coreSession;
private FrameSequence incomingSequence = null;
private FrameSequence outgoingSequence = null;
private boolean incomingFrameValidation = false;
@ -53,6 +54,17 @@ public class ValidationExtension extends AbstractExtension
return "@validation";
}
@Override
public void setCoreSession(CoreSession coreSession)
{
super.setCoreSession(coreSession);
// TODO: change validation to use static methods instead of down casting CoreSession.
if (!(coreSession instanceof WebSocketCoreSession))
throw new IllegalArgumentException("ValidationExtension needs a CoreSession Configuration");
this.coreSession = (WebSocketCoreSession)coreSession;
}
@Override
public void onFrame(Frame frame, Callback callback)
{
@ -62,7 +74,7 @@ public class ValidationExtension extends AbstractExtension
incomingSequence.check(frame.getOpCode(), frame.isFin());
if (incomingFrameValidation)
getWebSocketCoreSession().assertValidIncoming(frame);
coreSession.assertValidIncoming(frame);
if (incomingUtf8Validation != null)
validateUTF8(frame, incomingUtf8Validation, continuedInOpCode);
@ -85,7 +97,7 @@ public class ValidationExtension extends AbstractExtension
outgoingSequence.check(frame.getOpCode(), frame.isFin());
if (outgoingFrameValidation)
getWebSocketCoreSession().assertValidOutgoing(frame);
coreSession.assertValidOutgoing(frame);
if (outgoingUtf8Validation != null)
validateUTF8(frame, outgoingUtf8Validation, continuedOutOpCode);

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.exception.WebSocketTimeoutException;
/**
* Provides the implementation of {@link org.eclipse.jetty.io.Connection} that is suitable for WebSocket

View File

@ -36,19 +36,21 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.CloseException;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.WebSocketWriteTimeoutException;
import org.eclipse.jetty.websocket.core.exception.CloseException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.exception.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.exception.WebSocketWriteTimeoutException;
import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
import static org.eclipse.jetty.util.Callback.NOOP;
@ -56,7 +58,7 @@ import static org.eclipse.jetty.util.Callback.NOOP;
/**
* The Core WebSocket Session.
*/
public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSession, Dumpable
public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpable
{
private static final Logger LOG = Log.getLogger(WebSocketCoreSession.class);
private static final CloseStatus NO_CODE = new CloseStatus(CloseStatus.NO_CODE);
@ -809,7 +811,7 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe
private class Flusher extends FragmentingFlusher
{
public Flusher(FrameHandler.Configuration configuration)
public Flusher(Configuration configuration)
{
super(configuration);
}

View File

@ -23,7 +23,7 @@ import java.nio.channels.ClosedChannelException;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
/**
* Atomic Connection State
@ -142,7 +142,7 @@ public class WebSocketSessionState
}
}
public boolean onOutgoingFrame(Frame frame) throws ProtocolException
public boolean onOutgoingFrame(Frame frame) throws Exception
{
byte opcode = frame.getOpCode();
boolean fin = frame.isFin();
@ -150,7 +150,7 @@ public class WebSocketSessionState
synchronized (this)
{
if (!isOutputOpen())
throw new IllegalStateException(_sessionState.toString());
throw new ClosedChannelException();
if (opcode == OpCode.CLOSE)
{

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.server.internal.HandshakerSelector;
public interface Handshaker
@ -32,5 +32,5 @@ public interface Handshaker
return new HandshakerSelector();
}
boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException;
boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, Configuration.Customizer defaultCustomizer) throws IOException;
}

View File

@ -31,7 +31,6 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.QuotedCSV;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
@ -184,24 +183,6 @@ public abstract class Negotiation
extensionStack = null;
}
public ExtensionStack getExtensionStack()
{
if (extensionStack == null)
{
// Extension stack can decide to drop any of these extensions or their parameters
extensionStack = new ExtensionStack(components, Behavior.SERVER);
extensionStack.negotiate(offeredExtensions, negotiatedExtensions);
negotiatedExtensions = extensionStack.getNegotiatedExtensions();
if (extensionStack.hasNegotiatedExtensions())
baseRequest.getResponse().setHeader(HttpHeader.SEC_WEBSOCKET_EXTENSIONS,
ExtensionConfig.toHeaderValue(negotiatedExtensions));
else
baseRequest.getResponse().setHeader(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, null);
}
return extensionStack;
}
@Override
public String toString()
{

View File

@ -23,11 +23,12 @@ import java.util.function.Function;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
public interface WebSocketNegotiator extends FrameHandler.Customizer
public interface WebSocketNegotiator extends Configuration.Customizer
{
FrameHandler negotiate(Negotiation negotiation) throws IOException;
@ -51,7 +52,7 @@ public interface WebSocketNegotiator extends FrameHandler.Customizer
};
}
static WebSocketNegotiator from(Function<Negotiation, FrameHandler> negotiate, FrameHandler.Customizer customizer)
static WebSocketNegotiator from(Function<Negotiation, FrameHandler> negotiate, Configuration.Customizer customizer)
{
return new AbstractNegotiator(null, customizer)
{
@ -66,7 +67,7 @@ public interface WebSocketNegotiator extends FrameHandler.Customizer
static WebSocketNegotiator from(
Function<Negotiation, FrameHandler> negotiate,
WebSocketComponents components,
FrameHandler.Customizer customizer)
Configuration.Customizer customizer)
{
return new AbstractNegotiator(components, customizer)
{
@ -81,21 +82,21 @@ public interface WebSocketNegotiator extends FrameHandler.Customizer
abstract class AbstractNegotiator implements WebSocketNegotiator
{
final WebSocketComponents components;
final FrameHandler.Customizer customizer;
final Configuration.Customizer customizer;
public AbstractNegotiator()
{
this(null, null);
}
public AbstractNegotiator(WebSocketComponents components, FrameHandler.Customizer customizer)
public AbstractNegotiator(WebSocketComponents components, Configuration.Customizer customizer)
{
this.components = components == null ? new WebSocketComponents() : components;
this.customizer = customizer;
}
@Override
public void customize(FrameHandler.Configuration configurable)
public void customize(Configuration configurable)
{
if (customizer != null)
customizer.customize(configurable);
@ -125,7 +126,7 @@ public interface WebSocketNegotiator extends FrameHandler.Customizer
return components;
}
public FrameHandler.Customizer getCustomizer()
public Configuration.Customizer getCustomizer()
{
return customizer;
}

View File

@ -38,11 +38,12 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
@ -57,12 +58,13 @@ public abstract class AbstractHandshaker implements Handshaker
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, Configuration.Customizer defaultCustomizer) throws IOException
{
if (!validateRequest(request))
return false;
Negotiation negotiation = newNegotiation(request, response, negotiator.getWebSocketComponents());
WebSocketComponents components = negotiator.getWebSocketComponents();
Negotiation negotiation = newNegotiation(request, response, components);
if (LOG.isDebugEnabled())
LOG.debug("negotiation {}", negotiation);
negotiation.negotiate();
@ -123,8 +125,14 @@ public abstract class AbstractHandshaker implements Handshaker
throw new WebSocketException("Upgrade failed: multiple negotiated extensions of the same name");
}
// Create and Negotiate the ExtensionStack
ExtensionStack extensionStack = negotiation.getExtensionStack();
// Create and Negotiate the ExtensionStack. (ExtensionStack can drop any extensions or their parameters.)
ExtensionStack extensionStack = new ExtensionStack(components, Behavior.SERVER);
extensionStack.negotiate(negotiation.getOfferedExtensions(), negotiation.getNegotiatedExtensions());
negotiation.setNegotiatedExtensions(extensionStack.getNegotiatedExtensions());
if (extensionStack.hasNegotiatedExtensions())
baseRequest.getResponse().setHeader(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, ExtensionConfig.toHeaderValue(negotiation.getNegotiatedExtensions()));
else
baseRequest.getResponse().setHeader(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, null);
Negotiated negotiated = new Negotiated(baseRequest.getHttpURI().toURI(), protocol, baseRequest.isSecure(), extensionStack, WebSocketConstants.SPEC_VERSION_STRING);

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.server.Handshaker;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
@ -37,7 +37,7 @@ public class HandshakerSelector implements Handshaker
private final RFC8441Handshaker rfc8441 = new RFC8441Handshaker();
@Override
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, FrameHandler.Customizer defaultCustomizer) throws IOException
public boolean upgradeRequest(WebSocketNegotiator negotiator, HttpServletRequest request, HttpServletResponse response, Configuration.Customizer defaultCustomizer) throws IOException
{
// Try HTTP/1.1 WS upgrade, if this fails try an HTTP/2 WS upgrade if no response was committed.
return rfc6455.upgradeRequest(negotiator, request, response, defaultCustomizer) ||

View File

@ -74,7 +74,7 @@ public class AutoFragmentTest
public void testOutgoingAutoFragmentToMaxFrameSize() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, serverUri);
CompletableFuture<CoreSession> connect = client.connect(clientHandler, serverUri);
connect.get(5, TimeUnit.SECONDS);
// Turn off fragmentation on the server.
@ -122,7 +122,7 @@ public class AutoFragmentTest
public void testIncomingAutoFragmentToMaxFrameSize() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, serverUri);
CompletableFuture<CoreSession> connect = client.connect(clientHandler, serverUri);
connect.get(5, TimeUnit.SECONDS);
// Turn off fragmentation on the client.
@ -167,7 +167,7 @@ public class AutoFragmentTest
TestFrameHandler clientHandler = new TestFrameHandler();
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(client, serverUri, clientHandler);
upgradeRequest.addExtensions("permessage-deflate");
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(upgradeRequest);
CompletableFuture<CoreSession> connect = client.connect(upgradeRequest);
connect.get(5, TimeUnit.SECONDS);
// Turn off fragmentation on the client.
@ -220,7 +220,7 @@ public class AutoFragmentTest
TestFrameHandler clientHandler = new TestFrameHandler();
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(client, serverUri, clientHandler);
upgradeRequest.addExtensions("permessage-deflate");
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(upgradeRequest);
CompletableFuture<CoreSession> connect = client.connect(upgradeRequest);
connect.get(5, TimeUnit.SECONDS);
// Turn off fragmentation on the client.
@ -283,7 +283,7 @@ public class AutoFragmentTest
TestFrameHandler clientHandler = new TestFrameHandler();
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(client, serverUri, clientHandler);
upgradeRequest.addExtensions("permessage-deflate");
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(upgradeRequest);
CompletableFuture<CoreSession> connect = client.connect(upgradeRequest);
connect.get(5, TimeUnit.SECONDS);
// Turn off fragmentation on the client.

View File

@ -0,0 +1,135 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FlushTest
{
private WebSocketServer server;
private TestFrameHandler serverHandler = new TestFrameHandler();
private WebSocketCoreClient client;
private WebSocketComponents components = new WebSocketComponents();
@BeforeEach
public void startup() throws Exception
{
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverHandler);
server = new WebSocketServer(negotiator);
client = new WebSocketCoreClient(null, components);
server.start();
client.start();
}
@AfterEach
public void shutdown() throws Exception
{
server.stop();
client.stop();
}
@Test
public void testStandardFlush() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<CoreSession> connect = client.connect(clientHandler, server.getUri());
connect.get(5, TimeUnit.SECONDS);
// Send a batched frame.
clientHandler.sendFrame(new Frame(OpCode.TEXT, "text payload"), Callback.NOOP, true);
// We have batched the frame and not sent it.
assertNull(serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS));
// Once we flush the frame is received.
clientHandler.getCoreSession().flush(Callback.NOOP);
Frame frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("text payload"));
clientHandler.sendClose();
frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NO_CODE));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertNull(clientHandler.getError());
assertThat(clientHandler.closeStatus.getCode(), is(CloseStatus.NO_CODE));
}
@Test
public void testFlushOnCloseFrame() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<CoreSession> connect = client.connect(clientHandler, server.getUri());
connect.get(5, TimeUnit.SECONDS);
// Send a batched frame.
clientHandler.sendFrame(new Frame(OpCode.TEXT, "text payload"), Callback.NOOP, true);
// We have batched the frame and not sent it.
assertNull(serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS));
// Sending the close initiates the flush and the frame is received.
clientHandler.sendClose();
Frame frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("text payload"));
frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NO_CODE));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertNull(clientHandler.getError());
assertThat(clientHandler.closeStatus.getCode(), is(CloseStatus.NO_CODE));
}
@Test
public void testFlushAfterClose() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<CoreSession> connect = client.connect(clientHandler, server.getUri());
connect.get(5, TimeUnit.SECONDS);
clientHandler.sendClose();
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertNull(clientHandler.getError());
Callback.Completable flushCallback = new Callback.Completable();
clientHandler.getCoreSession().flush(flushCallback);
ExecutionException e = assertThrows(ExecutionException.class, () -> flushCallback.get(5, TimeUnit.SECONDS));
assertThat(e.getCause(), instanceOf(ClosedChannelException.class));
}
}

View File

@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.junit.jupiter.api.AfterEach;

View File

@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.stream.Stream;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.eclipse.jetty.websocket.core.internal.Negotiated;

View File

@ -30,6 +30,8 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.eclipse.jetty.websocket.core.internal.Negotiated;

View File

@ -29,7 +29,8 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.exception.BadPayloadException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.internal.FrameSequence;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.junit.jupiter.api.Test;

View File

@ -29,6 +29,8 @@ import org.eclipse.jetty.toolchain.test.Hex;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.hamcrest.Matchers;

View File

@ -53,7 +53,7 @@ public class TestWebSocketNegotiator implements WebSocketNegotiator
}
@Override
public void customize(FrameHandler.Configuration configurable)
public void customize(Configuration configurable)
{
}

View File

@ -19,9 +19,12 @@
package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.NetworkConnector;
@ -48,10 +51,13 @@ import org.junit.jupiter.params.provider.ValueSource;
import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -425,6 +431,97 @@ public class WebSocketCloseTest extends WebSocketTester
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
}
@ParameterizedTest
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
public void doubleNormalClose(String scheme) throws Exception
{
setup(State.OPEN, scheme);
Callback.Completable callback1 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal 1", callback1);
Callback.Completable callback2 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal 2", callback2);
// First Callback Succeeded
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
// Second Callback Failed with ClosedChannelException
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
// Normal close frame received on client.
Frame closeFrame = receiveFrame(client.getInputStream());
assertThat(closeFrame.getOpCode(), is(OpCode.CLOSE));
CloseStatus closeStatus = CloseStatus.getCloseStatus(closeFrame);
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(closeStatus.getReason(), is("normal 1"));
// Send close response from client.
client.getOutputStream().write(RawFrameBuilder.buildClose(
new CloseStatus(CloseStatus.NORMAL, "normal response 1"), true));
server.handler.getCoreSession().demand(1);
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
Callback closeFrameCallback = Objects.requireNonNull(server.handler.receivedCallback.poll());
closeFrameCallback.succeeded();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(server.handler.closeStatus.getReason(), is("normal response 1"));
}
@ParameterizedTest
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
public void doubleAbnormalClose(String scheme) throws Exception
{
setup(State.OPEN, scheme);
Callback.Completable callback1 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.SERVER_ERROR, "server error should succeed", callback1);
Callback.Completable callback2 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.PROTOCOL, "protocol error should fail", callback2);
// First Callback Succeeded
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
// Second Callback Failed with ClosedChannelException
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getReason(), containsString("server error should succeed"));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
}
@ParameterizedTest
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
public void doubleCloseAbnormalOvertakesNormalClose(String scheme) throws Exception
{
setup(State.OPEN, scheme);
Callback.Completable callback1 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal close (client does not complete close handshake)", callback1);
Callback.Completable callback2 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.SERVER_ERROR, "error close should overtake normal close", callback2);
// First Callback Succeeded
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
// Second Callback Failed with ClosedChannelException
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getReason(), containsString("error close should overtake normal close"));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NORMAL));
}
static class DemandingTestFrameHandler implements SynchronousFrameHandler
{
private CoreSession coreSession;

View File

@ -37,10 +37,10 @@ import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.UpgradeListener;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.exception.UpgradeException;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;

View File

@ -59,7 +59,7 @@ public class WebSocketOpenTest extends WebSocketTester
server.stop();
}
public void setup(BiFunction<FrameHandler.CoreSession, Callback, Void> onOpen) throws Exception
public void setup(BiFunction<CoreSession, Callback, Void> onOpen) throws Exception
{
serverHandler = new DemandingAsyncFrameHandler(onOpen);
server = new WebSocketServer(serverHandler);
@ -136,7 +136,7 @@ public class WebSocketOpenTest extends WebSocketTester
@Test
public void testAsyncOnOpen() throws Exception
{
Exchanger<FrameHandler.CoreSession> sx = new Exchanger<>();
Exchanger<CoreSession> sx = new Exchanger<>();
Exchanger<Callback> cx = new Exchanger<>();
setup((s, c) ->
{
@ -153,7 +153,7 @@ public class WebSocketOpenTest extends WebSocketTester
return null;
});
FrameHandler.CoreSession coreSession = sx.exchange(null);
CoreSession coreSession = sx.exchange(null);
Callback onOpenCallback = cx.exchange(null);
Thread.sleep(100);

View File

@ -47,6 +47,7 @@ public class WebSocketTester
private static String NON_RANDOM_KEY = Base64.getEncoder().encodeToString("0123456701234567".getBytes());
private static SslContextFactory.Client sslContextFactory;
protected ByteBufferPool bufferPool;
protected ByteBuffer buffer;
protected Parser parser;
@BeforeAll
@ -159,33 +160,34 @@ public class WebSocketTester
protected Parser.ParsedFrame receiveFrame(InputStream in) throws IOException
{
ByteBuffer buffer = bufferPool.acquire(4096, false);
if (buffer == null)
buffer = bufferPool.acquire(4096, false);
while (true)
{
Parser.ParsedFrame frame = parser.parse(buffer);
if (!buffer.hasRemaining())
BufferUtil.clear(buffer);
if (frame != null)
return frame;
int p = BufferUtil.flipToFill(buffer);
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
if (len < 0)
return null;
buffer.position(buffer.position() + len);
BufferUtil.flipToFlush(buffer, p);
Parser.ParsedFrame frame = parser.parse(buffer);
if (frame != null)
return frame;
}
}
protected void receiveEof(InputStream in) throws IOException
{
ByteBuffer buffer = bufferPool.acquire(4096, false);
while (true)
{
BufferUtil.flipToFill(buffer);
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
if (len < 0)
return;
BufferUtil.clearToFill(buffer);
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
if (len < 0)
return;
throw new IllegalStateException("unexpected content");
}
throw new IllegalStateException("unexpected content");
}
}

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.time.Duration;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.TestMessageHandler;
public class AutobahnFrameHandler extends TestMessageHandler

View File

@ -29,7 +29,7 @@ import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.UrlEncoded;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.TestMessageHandler;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
@ -154,7 +154,7 @@ public class CoreAutobahnClient
{
URI wsUri = baseWebsocketUri.resolve("/getCaseCount");
TestMessageHandler onCaseCount = new TestMessageHandler();
Future<FrameHandler.CoreSession> response = client.connect(onCaseCount, wsUri);
Future<CoreSession> response = client.connect(onCaseCount, wsUri);
if (waitForUpgrade(wsUri, response))
{
@ -173,7 +173,7 @@ public class CoreAutobahnClient
LOG.info("test uri: {}", wsUri);
AutobahnFrameHandler echoHandler = new AutobahnFrameHandler();
Future<FrameHandler.CoreSession> response = client.connect(echoHandler, wsUri);
Future<CoreSession> response = client.connect(echoHandler, wsUri);
if (waitForUpgrade(wsUri, response))
{
// Wait up to 5 min as some of the tests can take a while
@ -201,14 +201,14 @@ public class CoreAutobahnClient
{
URI wsUri = baseWebsocketUri.resolve("/updateReports?agent=" + UrlEncoded.encodeString(userAgent));
TestMessageHandler onUpdateReports = new TestMessageHandler();
Future<FrameHandler.CoreSession> response = client.connect(onUpdateReports, wsUri);
Future<CoreSession> response = client.connect(onUpdateReports, wsUri);
response.get(5, TimeUnit.SECONDS);
assertTrue(onUpdateReports.closeLatch.await(15, TimeUnit.SECONDS));
LOG.info("Reports updated.");
LOG.info("Test suite finished!");
}
private boolean waitForUpgrade(URI wsUri, Future<FrameHandler.CoreSession> response) throws InterruptedException
private boolean waitForUpgrade(URI wsUri, Future<CoreSession> response) throws InterruptedException
{
try
{

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.MessageHandler;
import org.eclipse.jetty.websocket.core.server.Negotiation;

View File

@ -26,8 +26,8 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.TestFrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketServer;

View File

@ -77,7 +77,7 @@ public class ExtensionTool
{
this.ext = components.getExtensionRegistry().newInstance(extConfig, components);
this.ext.setNextIncomingFrames(capture);
this.ext.setWebSocketCoreSession(newWebSocketCoreSession());
this.ext.setCoreSession(newWebSocketCoreSession());
}
public void parseIncomingHex(String... rawhex)

View File

@ -31,14 +31,14 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Configuration.ConfigurationCustomizer;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler.ConfigurationCustomizer;
import org.eclipse.jetty.websocket.core.IncomingFramesCapture;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.OutgoingFramesCapture;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.TestMessageHandler;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.PerMessageDeflateExtension;
@ -376,7 +376,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
ext.init(config, components);
ext.setWebSocketCoreSession(newSession());
ext.setCoreSession(newSession());
// Setup capture of incoming frames
IncomingFramesCapture capture = new IncomingFramesCapture();
@ -450,7 +450,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
{
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
ext.init(ExtensionConfig.parse("permessage-deflate"), components);
ext.setWebSocketCoreSession(newSession());
ext.setCoreSession(newSession());
// Setup capture of outgoing frames
OutgoingFramesCapture capture = new OutgoingFramesCapture();
@ -497,7 +497,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
ext.init(config, components);
ext.setWebSocketCoreSession(newSession());
ext.setCoreSession(newSession());
// Setup capture of incoming frames
OutgoingFramesCapture capture = new OutgoingFramesCapture();

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
@ -116,7 +117,7 @@ public class PerMessageDeflaterBufferSizeTest
});
// Connect to the server.
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(upgradeRequest);
CompletableFuture<CoreSession> connect = client.connect(upgradeRequest);
connect.get(5, TimeUnit.SECONDS);
// Make sure the internal parameter was not sent to the server.
@ -170,7 +171,7 @@ public class PerMessageDeflaterBufferSizeTest
});
// Connect to the server.
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(upgradeRequest);
CompletableFuture<CoreSession> connect = client.connect(upgradeRequest);
connect.get(5, TimeUnit.SECONDS);
// Make sure the internal parameter was not sent to the server.
@ -225,7 +226,7 @@ public class PerMessageDeflaterBufferSizeTest
});
// Connect to the server.
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(upgradeRequest);
CompletableFuture<CoreSession> connect = client.connect(upgradeRequest);
connect.get(5, TimeUnit.SECONDS);
// Make sure the internal parameter was not sent from the server.
@ -280,7 +281,7 @@ public class PerMessageDeflaterBufferSizeTest
});
// Connect to the server.
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(upgradeRequest);
CompletableFuture<CoreSession> connect = client.connect(upgradeRequest);
connect.get(5, TimeUnit.SECONDS);
// Make sure the internal parameter was not sent from the server.

View File

@ -43,7 +43,7 @@ import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketWriteTimeoutException;
import org.eclipse.jetty.websocket.core.exception.WebSocketWriteTimeoutException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;

View File

@ -39,10 +39,10 @@ import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.EchoFrameHandler;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.TestAsyncFrameHandler;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
@ -72,7 +72,7 @@ public class WebSocketProxyTest
private WebSocketProxy proxy;
private EchoFrameHandler serverFrameHandler;
private TestHandler testHandler;
FrameHandler.ConfigurationCustomizer defaultCustomizer;
Configuration.ConfigurationCustomizer defaultCustomizer;
private class TestHandler extends AbstractHandler
{
@ -109,7 +109,7 @@ public class WebSocketProxyTest
testHandler = new TestHandler();
handlers.addHandler(testHandler);
defaultCustomizer = new FrameHandler.ConfigurationCustomizer();
defaultCustomizer = new Configuration.ConfigurationCustomizer();
defaultCustomizer.setIdleTimeout(Duration.ofSeconds(3));
ContextHandler serverContext = new ContextHandler("/server");
@ -221,7 +221,7 @@ public class WebSocketProxyTest
CloseStatus closeStatus = CloseStatus.getCloseStatus(clientFrameHandler.receivedFrames.poll());
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), containsString("Failed to upgrade to websocket: Unexpected HTTP Response Status Code:"));
assertThat(closeStatus.getReason(), containsString("Failed to upgrade to websocket: Unexpected HTTP Response"));
}
@Test

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.RawFrameBuilder;

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.websocket.javax.common;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
/**
* Indicating that the provided Class is not a valid WebSocket per the chosen API.

View File

@ -30,8 +30,8 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.common.messages.MessageOutputStream;
import org.eclipse.jetty.websocket.javax.common.messages.MessageWriter;
@ -41,7 +41,7 @@ public class JavaxWebSocketAsyncRemote extends JavaxWebSocketRemoteEndpoint impl
{
static final Logger LOG = Log.getLogger(JavaxWebSocketAsyncRemote.class);
protected JavaxWebSocketAsyncRemote(JavaxWebSocketSession session, FrameHandler.CoreSession coreSession)
protected JavaxWebSocketAsyncRemote(JavaxWebSocketSession session, CoreSession coreSession)
{
super(session, coreSession);
}

View File

@ -22,15 +22,16 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import javax.websocket.EncodeException;
import javax.websocket.RemoteEndpoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.common.util.TextUtil;
@ -40,7 +41,7 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
{
private static final Logger LOG = Log.getLogger(JavaxWebSocketBasicRemote.class);
protected JavaxWebSocketBasicRemote(JavaxWebSocketSession session, FrameHandler.CoreSession coreSession)
protected JavaxWebSocketBasicRemote(JavaxWebSocketSession session, CoreSession coreSession)
{
super(session, coreSession);
}
@ -65,10 +66,10 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
{
LOG.debug("sendBinary({})", BufferUtil.toDetailString(data));
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
sendFrame(new Frame(OpCode.BINARY).setPayload(data), b, false);
}
FutureCallback b = new FutureCallback();
sendFrame(new Frame(OpCode.BINARY).setPayload(data), b, false);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -79,37 +80,36 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
{
LOG.debug("sendBinary({},{})", BufferUtil.toDetailString(partialByte), isLast);
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.BINARY);
break;
case OpCode.TEXT:
throw new IllegalStateException("Cannot send a partial BINARY message: TEXT message in progress");
case OpCode.BINARY:
frame = new Frame(OpCode.CONTINUATION);
break;
default:
throw new IllegalStateException("Cannot send a partial BINARY message: unrecognized active message type " + OpCode.name(messageType));
}
frame.setPayload(partialByte);
frame.setFin(isLast);
sendFrame(frame, b, false);
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.BINARY);
break;
case OpCode.TEXT:
throw new IllegalStateException("Cannot send a partial BINARY message: TEXT message in progress");
case OpCode.BINARY:
frame = new Frame(OpCode.CONTINUATION);
break;
default:
throw new IllegalStateException("Cannot send a partial BINARY message: unrecognized active message type " + OpCode.name(messageType));
}
frame.setPayload(partialByte);
frame.setFin(isLast);
FutureCallback b = new FutureCallback();
sendFrame(frame, b, false);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public void sendObject(Object data) throws IOException, EncodeException
{
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
super.sendObject(data, b);
}
FutureCallback b = new FutureCallback();
super.sendObject(data, b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -120,10 +120,11 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
{
LOG.debug("sendText({})", TextUtil.hint(text));
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
sendFrame(new Frame(OpCode.TEXT).setPayload(text), b, false);
}
FutureCallback b = new FutureCallback();
sendFrame(new Frame(OpCode.TEXT).setPayload(text), b, false);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -134,27 +135,33 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
{
LOG.debug("sendText({},{})", TextUtil.hint(partialMessage), isLast);
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.TEXT);
break;
case OpCode.TEXT:
frame = new Frame(OpCode.CONTINUATION);
break;
case OpCode.BINARY:
throw new IllegalStateException("Cannot send a partial TEXT message: BINARY message in progress");
default:
throw new IllegalStateException("Cannot send a partial TEXT message: unrecognized active message type " + OpCode.name(messageType));
}
frame.setPayload(BufferUtil.toBuffer(partialMessage, UTF_8));
frame.setFin(isLast);
sendFrame(frame, b, false);
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.TEXT);
break;
case OpCode.TEXT:
frame = new Frame(OpCode.CONTINUATION);
break;
case OpCode.BINARY:
throw new IllegalStateException("Cannot send a partial TEXT message: BINARY message in progress");
default:
throw new IllegalStateException("Cannot send a partial TEXT message: unrecognized active message type " + OpCode.name(messageType));
}
frame.setPayload(BufferUtil.toBuffer(partialMessage, UTF_8));
frame.setFin(isLast);
FutureCallback b = new FutureCallback();
sendFrame(frame, b, false);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
private long getBlockingTimeout()
{
long idleTimeout = getIdleTimeout();
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
}
}

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
@ -41,9 +41,9 @@ public abstract class JavaxWebSocketContainer extends ContainerLifeCycle impleme
{
private static final Logger LOG = Log.getLogger(JavaxWebSocketContainer.class);
private final SessionTracker sessionTracker = new SessionTracker();
protected final FrameHandler.ConfigurationCustomizer defaultCustomizer = new FrameHandler.ConfigurationCustomizer();
private final List<JavaxWebSocketSessionListener> sessionListeners = new ArrayList<>();
protected final Configuration.ConfigurationCustomizer defaultCustomizer = new Configuration.ConfigurationCustomizer();
protected final WebSocketComponents components;
private List<JavaxWebSocketSessionListener> sessionListeners = new ArrayList<>();
public JavaxWebSocketContainer(WebSocketComponents components)
{

View File

@ -22,10 +22,12 @@ import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
@ -40,11 +42,12 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryMessageSink;
import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryStreamMessageSink;
@ -60,6 +63,8 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
private final Logger logger;
private final JavaxWebSocketContainer container;
private final Object endpointInstance;
private final AtomicBoolean closeNotified = new AtomicBoolean();
/**
* List of configured named variables in the uri-template.
* <p>
@ -278,9 +283,27 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
dataType = OpCode.UNDEFINED;
}
public void onClose(Frame frame, Callback callback)
{
notifyOnClose(CloseStatus.getCloseStatus(frame), callback);
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
notifyOnClose(closeStatus, callback);
container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionClosed(session));
}
private void notifyOnClose(CloseStatus closeStatus, Callback callback)
{
// Make sure onClose is only notified once.
if (!closeNotified.compareAndSet(false, true))
{
callback.failed(new ClosedChannelException());
return;
}
try
{
if (closeHandle != null)
@ -288,14 +311,13 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
closeHandle.invoke(closeReason);
}
callback.succeeded();
}
catch (Throwable cause)
{
callback.failed(new WebSocketException(endpointInstance.getClass().getSimpleName() + " CLOSE method error: " + cause.getMessage(), cause));
}
container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionClosed(session));
}
@Override
@ -572,11 +594,6 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
activeMessageSink = null;
}
public void onClose(Frame frame, Callback callback)
{
callback.succeeded();
}
public void onPing(Frame frame, Callback callback)
{
ByteBuffer payload = BufferUtil.copy(frame.getPayload());

View File

@ -21,20 +21,21 @@ package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.SendHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.javax.common.messages.MessageOutputStream;
import org.eclipse.jetty.websocket.javax.common.messages.MessageWriter;
@ -43,11 +44,11 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
private static final Logger LOG = Log.getLogger(JavaxWebSocketRemoteEndpoint.class);
protected final JavaxWebSocketSession session;
private final FrameHandler.CoreSession coreSession;
private final CoreSession coreSession;
protected boolean batch = false;
protected byte messageType = -1;
protected JavaxWebSocketRemoteEndpoint(JavaxWebSocketSession session, FrameHandler.CoreSession coreSession)
protected JavaxWebSocketRemoteEndpoint(JavaxWebSocketSession session, CoreSession coreSession)
{
this.session = session;
this.coreSession = coreSession;
@ -66,10 +67,9 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
@Override
public void flushBatch() throws IOException
{
try (SharedBlockingCallback.Blocker blocker = session.getBlocking().acquire())
{
coreSession.flush(blocker);
}
FutureCallback b = new FutureCallback();
coreSession.flush(b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -227,24 +227,22 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
public void sendPing(ByteBuffer data) throws IOException, IllegalArgumentException
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPing({})", BufferUtil.toDetailString(data));
}
// TODO: is this supposed to be a blocking call?
// TODO: what to do on excessively large payloads (error and close connection per RFC6455, or truncate?)
sendFrame(new Frame(OpCode.PING).setPayload(data), Callback.NOOP, batch);
FutureCallback b = new FutureCallback();
sendFrame(new Frame(OpCode.PING).setPayload(data), b, batch);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public void sendPong(ByteBuffer data) throws IOException, IllegalArgumentException
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPong({})", BufferUtil.toDetailString(data));
}
// TODO: is this supposed to be a blocking call?
// TODO: what to do on excessively large payloads (error and close connection per RFC6455, or truncate?)
sendFrame(new Frame(OpCode.PONG).setPayload(data), Callback.NOOP, batch);
FutureCallback b = new FutureCallback();
sendFrame(new Frame(OpCode.PONG).setPayload(data), b, batch);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
protected void assertMessageNotNull(Object data)
@ -262,4 +260,10 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
throw new IllegalArgumentException("SendHandler cannot be null");
}
}
private long getBlockingTimeout()
{
long idleTimeout = getIdleTimeout();
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
}
}

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
@ -38,11 +39,11 @@ import javax.websocket.RemoteEndpoint.Basic;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.javax.common.encoders.AvailableEncoders;
import org.eclipse.jetty.websocket.javax.common.util.ReflectUtils;
@ -54,9 +55,8 @@ public class JavaxWebSocketSession implements javax.websocket.Session
{
private static final Logger LOG = Log.getLogger(JavaxWebSocketSession.class);
protected final SharedBlockingCallback blocking = new SharedBlockingCallback();
private final JavaxWebSocketContainer container;
private final FrameHandler.CoreSession coreSession;
private final CoreSession coreSession;
private final JavaxWebSocketFrameHandler frameHandler;
private final EndpointConfig config;
private final AvailableDecoders availableDecoders;
@ -69,7 +69,7 @@ public class JavaxWebSocketSession implements javax.websocket.Session
private JavaxWebSocketBasicRemote basicRemote;
public JavaxWebSocketSession(JavaxWebSocketContainer container,
FrameHandler.CoreSession coreSession,
CoreSession coreSession,
JavaxWebSocketFrameHandler frameHandler,
EndpointConfig endpointConfig)
{
@ -94,7 +94,7 @@ public class JavaxWebSocketSession implements javax.websocket.Session
this.userProperties = this.config.getUserProperties();
}
public FrameHandler.CoreSession getCoreSession()
public CoreSession getCoreSession()
{
return coreSession;
}
@ -179,10 +179,7 @@ public class JavaxWebSocketSession implements javax.websocket.Session
@Override
public void close() throws IOException
{
try (SharedBlockingCallback.Blocker blocker = blocking.acquire())
{
coreSession.close(blocker);
}
close(new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, null));
}
/**
@ -194,10 +191,15 @@ public class JavaxWebSocketSession implements javax.websocket.Session
@Override
public void close(CloseReason closeReason) throws IOException
{
try (SharedBlockingCallback.Blocker blocker = blocking.acquire())
{
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), blocker);
}
FutureCallback b = new FutureCallback();
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
private long getBlockingTimeout()
{
long idleTimeout = getMaxIdleTimeout();
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
}
/**
@ -565,9 +567,4 @@ public class JavaxWebSocketSession implements javax.websocket.Session
return String.format("%s@%x[%s,%s]", this.getClass().getSimpleName(), this.hashCode(),
coreSession.getBehavior(), frameHandler);
}
protected SharedBlockingCallback getBlocking()
{
return blocking;
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@ -25,9 +26,13 @@ import javax.websocket.CloseReason;
import javax.websocket.Session;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener
{
private static final Logger LOG = Log.getLogger(SessionTracker.class);
private CopyOnWriteArraySet<JavaxWebSocketSession> sessions = new CopyOnWriteArraySet<>();
public Set<Session> getSessions()
@ -52,8 +57,15 @@ public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketS
{
for (Session session : sessions)
{
// GOING_AWAY is abnormal close status so it will hard close connection after sent.
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
try
{
// GOING_AWAY is abnormal close status so it will hard close connection after sent.
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
}
catch (IOException e)
{
LOG.ignore(e);
}
}
super.doStop();

View File

@ -27,7 +27,7 @@ import java.util.Objects;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.util.InvalidSignatureException;

View File

@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
public class ByteBufferMessageSink extends AbstractMessageSink

View File

@ -26,7 +26,7 @@ import javax.websocket.CloseReason;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import org.eclipse.jetty.websocket.core.CloseException;
import org.eclipse.jetty.websocket.core.exception.CloseException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.MessageSink;

View File

@ -26,7 +26,7 @@ import javax.websocket.CloseReason;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import org.eclipse.jetty.websocket.core.CloseException;
import org.eclipse.jetty.websocket.core.exception.CloseException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.MessageSink;

View File

@ -25,7 +25,7 @@ import javax.websocket.CloseReason;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import org.eclipse.jetty.websocket.core.CloseException;
import org.eclipse.jetty.websocket.core.exception.CloseException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.MessageSink;

View File

@ -26,7 +26,7 @@ import javax.websocket.CloseReason;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import org.eclipse.jetty.websocket.core.CloseException;
import org.eclipse.jetty.websocket.core.exception.CloseException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.MessageSink;

View File

@ -28,8 +28,8 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
/**
@ -39,7 +39,7 @@ public class MessageOutputStream extends OutputStream
{
private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
private final FrameHandler.CoreSession coreSession;
private final CoreSession coreSession;
private final ByteBufferPool bufferPool;
private final SharedBlockingCallback blocker;
private long frameCount;
@ -49,7 +49,7 @@ public class MessageOutputStream extends OutputStream
private Callback callback;
private boolean closed;
public MessageOutputStream(FrameHandler.CoreSession coreSession, int bufferSize, ByteBufferPool bufferPool)
public MessageOutputStream(CoreSession coreSession, int bufferSize, ByteBufferPool bufferPool)
{
this.coreSession = coreSession;
this.bufferPool = bufferPool;

View File

@ -30,8 +30,8 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -49,7 +49,7 @@ public class MessageWriter extends Writer
.onUnmappableCharacter(CodingErrorAction.REPORT)
.onMalformedInput(CodingErrorAction.REPORT);
private final FrameHandler.CoreSession coreSession;
private final CoreSession coreSession;
private final SharedBlockingCallback blocker;
private long frameCount;
private Frame frame;
@ -57,7 +57,7 @@ public class MessageWriter extends Writer
private Callback callback;
private boolean closed;
public MessageWriter(FrameHandler.CoreSession coreSession, int bufferSize)
public MessageWriter(CoreSession coreSession, int bufferSize)
{
this.coreSession = coreSession;
this.blocker = new SharedBlockingCallback();

View File

@ -27,7 +27,7 @@ import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
public class StringMessageSink extends AbstractMessageSink

View File

@ -23,7 +23,7 @@ import java.util.Map;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.javax.common.encoders.AvailableEncoders;
import org.junit.jupiter.api.AfterAll;
@ -50,7 +50,7 @@ public abstract class AbstractJavaxWebSocketFrameHandlerTest
protected AvailableDecoders decoders;
protected Map<String, String> uriParams;
protected EndpointConfig endpointConfig;
protected FrameHandler.CoreSession coreSession = new FrameHandler.CoreSession.Empty();
protected CoreSession coreSession = new CoreSession.Empty();
public AbstractJavaxWebSocketFrameHandlerTest()
{

View File

@ -22,7 +22,7 @@ import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@ -39,7 +39,7 @@ public abstract class AbstractSessionTest
Object websocketPojo = new DummyEndpoint();
UpgradeRequest upgradeRequest = new UpgradeRequestAdapter();
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(websocketPojo, upgradeRequest);
FrameHandler.CoreSession coreSession = new FrameHandler.CoreSession.Empty();
CoreSession coreSession = new CoreSession.Empty();
session = new JavaxWebSocketSession(container, coreSession, frameHandler, container.getFrameHandlerFactory()
.newDefaultEndpointConfig(websocketPojo.getClass(), null));
}

View File

@ -26,8 +26,8 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.junit.jupiter.api.Test;
@ -139,7 +139,7 @@ public class MessageWriterTest
assertThat("Message[0].length", message.length(), is(testSize));
}
public static class FrameCapture extends FrameHandler.CoreSession.Empty
public static class FrameCapture extends CoreSession.Empty
{
public BlockingQueue<Frame> frames = new LinkedBlockingQueue<>();
@ -151,7 +151,7 @@ public class MessageWriterTest
}
}
public static class WholeMessageCapture extends FrameHandler.CoreSession.Empty
public static class WholeMessageCapture extends CoreSession.Empty
{
public BlockingQueue<String> messages = new LinkedBlockingQueue<>();

View File

@ -37,8 +37,8 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketException;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.servlet.WebSocketMapping;

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
@ -116,7 +117,7 @@ public class CoreServer extends ContainerLifeCycle
}
@Override
public void customize(FrameHandler.Configuration configurable)
public void customize(Configuration configurable)
{
}
@ -178,7 +179,7 @@ public class CoreServer extends ContainerLifeCycle
}
@Override
public void customize(FrameHandler.Configuration configurable)
public void customize(Configuration configurable)
{
}
}

View File

@ -52,6 +52,7 @@ public class EventSocket
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
@OnOpen
public void onOpen(Session session, EndpointConfig endpointConfig)
@ -95,5 +96,6 @@ public class EventSocket
if (LOG.isDebugEnabled())
LOG.debug("{} onError(): {}", toString(), cause);
error = cause;
errorLatch.countDown();
}
}

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
@ -81,7 +82,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
this.client.start();
this.generator = new UnitGenerator(Behavior.CLIENT);
CompletableFuture<FrameHandler.CoreSession> futureHandler = this.client.connect(upgradeRequest);
CompletableFuture<CoreSession> futureHandler = this.client.connect(upgradeRequest);
CompletableFuture<FrameCapture> futureCapture = futureHandler.thenCombine(upgradeRequest.getFuture(), (session, capture) -> capture);
this.frameCapture = futureCapture.get(10, TimeUnit.SECONDS);
}

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;

View File

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.MessageHandler;
public class FrameHandlerTracker extends MessageHandler

Some files were not shown because too many files have changed in this diff Show More