Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-10.0.x-4450-WebSocketCoreJPMS

This commit is contained in:
Lachlan Roberts 2020-01-29 19:21:39 +11:00
commit 1afb7fd1f2
29 changed files with 938 additions and 261 deletions

View File

@ -42,6 +42,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M4</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>

View File

@ -16,22 +16,43 @@
// ========================================================================
//
=== Upgrading from Jetty 9.x to Jetty 10.0.x
=== Upgrading from Jetty 9.4.x to Jetty 10.0.x
The purpose of this guide is to assist users migrating from Jetty 9 to 10.
The purpose of this guide is to assist users migrating from Jetty 9.4 to 10.
It is not comprehensive, but covers many of the major changes included in the release that may prove as problem areas for users.
//TODO - Make note of any specific required Java versions.
==== Required Java Version
Jetty 10 requires, at a minimum, Java 9 to function.
Items such as the Java Platform Module System (JPMS), which Jetty 10 supports, are not available in earlier versions of Java.
==== Removed Classes
//TODO - Insert major removed/refactored classes from Jetty-9.x.x to Jetty-10.0.x
==== Changes to Websocket
//TODO - List of changes to Websocket -- Joakim/Lachlan
==== `javax.mail` and `javax.transaction`
Both `javax.mail` and `javax.transaction` have been removed from the Jetty Distribution in Jetty 10.
If you require these jars, you will need to enable the `ext` link:#startup-modules[module] and copy the files to your `$JETTY_BASE/lib/ext` directory.
==== Removed Classes
//TODO - Insert major removed/refactored classes from Jetty-9.x.x to Jetty-10.0.x
==== Module Changes in Jetty 10.0
===== New Modules in Jetty 10.0
//TODO - Insert new modules introduced in Jetty 10
===== Changes to Existing Modules in Jetty 10.0
//TODO - Insert module changes introduced in Jetty 10
==== Changes to Sessions
//TODO - List of changes to Sessions -- Jan
==== Removal of System Properties(?)
//TODO - List of removed System bits --- Greg

View File

@ -31,17 +31,16 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletInputStream;
import javax.servlet.http.Part;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ByteArrayOutputStream2;
import org.eclipse.jetty.util.LazyList;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.MultiMap;
import org.eclipse.jetty.util.QuotedStringTokenizer;
@ -53,24 +52,55 @@ import org.eclipse.jetty.util.log.Logger;
* MultiPartInputStream
* <p>
* Handle a MultiPart Mime input stream, breaking it up on the boundary into files and strings.
* </p>
* <p>
* Deleting the parts can be done from a different thread if the parts are parsed asynchronously.
* Because of this we use the state to fail the parsing and coordinate which thread will delete any remaining parts.
* The deletion of parts is done by the cleanup thread in all cases except the transition from DELETING-&gt;DELETED which
* is done by the parsing thread.
* </p>
* <pre>{@code
* UNPARSED - Parsing has not started, there are no parts which need to be cleaned up.
* PARSING - The parsing thread is reading from the InputStream and generating parts.
* PARSED - Parsing has complete and no more parts will be generated.
* DELETING - deleteParts() has been called while we were in PARSING state, parsing thread will do the delete.
* DELETED - The parts have been deleted, this is the terminal state.
*
* deleteParts()
* +--------------------------------------------------------------+
* | |
* | deleteParts() v
* UNPARSED -------> PARSING --------> PARSED ------------------>DELETED
* | ^
* | |
* +---------------> DELETING -------------------+
* deleteParts() parsing thread
* }</pre>
* @see <a href="https://tools.ietf.org/html/rfc7578">https://tools.ietf.org/html/rfc7578</a>
*/
public class MultiPartFormInputStream
{
private enum State
{
UNPARSED,
PARSING,
PARSED,
DELETING,
DELETED
}
private static final Logger LOG = Log.getLogger(MultiPartFormInputStream.class);
private static final MultiMap<Part> EMPTY_MAP = new MultiMap<>(Collections.emptyMap());
private final MultiMap<Part> _parts;
private InputStream _in;
private MultipartConfigElement _config;
private String _contentType;
private Throwable _err;
private File _tmpDir;
private File _contextTmpDir;
private boolean _deleteOnExit;
private boolean _writeFilesWithFilenames;
private boolean _parsed;
private int _bufferSize = 16 * 1024;
private final MultiMap<Part> _parts = new MultiMap<>();
private final InputStream _in;
private final MultipartConfigElement _config;
private final File _contextTmpDir;
private final String _contentType;
private volatile Throwable _err;
private volatile File _tmpDir;
private volatile boolean _deleteOnExit;
private volatile boolean _writeFilesWithFilenames;
private volatile int _bufferSize = 16 * 1024;
private State state = State.UNPARSED;
public class MultiPart implements Part
{
@ -332,40 +362,38 @@ public class MultiPartFormInputStream
*/
public MultiPartFormInputStream(InputStream in, String contentType, MultipartConfigElement config, File contextTmpDir)
{
// Must be a multipart request.
_contentType = contentType;
_config = config;
_contextTmpDir = contextTmpDir;
if (_contextTmpDir == null)
_contextTmpDir = new File(System.getProperty("java.io.tmpdir"));
if (_contentType == null || !_contentType.startsWith("multipart/form-data"))
throw new IllegalArgumentException("content type is not multipart/form-data");
if (_config == null)
_config = new MultipartConfigElement(_contextTmpDir.getAbsolutePath());
MultiMap parts = new MultiMap();
_contextTmpDir = (contextTmpDir != null) ? contextTmpDir : new File(System.getProperty("java.io.tmpdir"));
_config = (config != null) ? config : new MultipartConfigElement(_contextTmpDir.getAbsolutePath());
if (in instanceof ServletInputStream)
{
if (((ServletInputStream)in).isFinished())
{
parts = EMPTY_MAP;
_parsed = true;
_in = null;
state = State.PARSED;
return;
}
}
if (!_parsed)
_in = new BufferedInputStream(in);
_parts = parts;
_in = new BufferedInputStream(in);
}
/**
* @return whether the list of parsed parts is empty
* @deprecated use getParts().isEmpty()
*/
@Deprecated
public boolean isEmpty()
{
if (_parts == null)
if (_parts.isEmpty())
return true;
Collection<List<Part>> values = _parts.values();
for (List<Part> partList : values)
for (List<Part> partList : _parts.values())
{
if (!partList.isEmpty())
return false;
@ -378,6 +406,33 @@ public class MultiPartFormInputStream
* Delete any tmp storage for parts, and clear out the parts list.
*/
public void deleteParts()
{
synchronized (this)
{
switch (state)
{
case DELETED:
case DELETING:
return;
case PARSING:
state = State.DELETING;
return;
case UNPARSED:
state = State.DELETED;
return;
case PARSED:
state = State.DELETED;
break;
}
}
delete();
}
private void delete()
{
MultiException err = null;
for (List<Part> parts : _parts.values())
@ -410,21 +465,9 @@ public class MultiPartFormInputStream
*/
public Collection<Part> getParts() throws IOException
{
if (!_parsed)
parse();
parse();
throwIfError();
if (_parts.isEmpty())
return Collections.emptyList();
Collection<List<Part>> values = _parts.values();
List<Part> parts = new ArrayList<>();
for (List<Part> o : values)
{
List<Part> asList = LazyList.getList(o, false);
parts.addAll(asList);
}
return parts;
return _parts.values().stream().flatMap(List::stream).collect(Collectors.toList());
}
/**
@ -436,8 +479,7 @@ public class MultiPartFormInputStream
*/
public Part getPart(String name) throws IOException
{
if (!_parsed)
parse();
parse();
throwIfError();
return _parts.getValue(name, 0);
}
@ -468,19 +510,26 @@ public class MultiPartFormInputStream
*/
protected void parse()
{
// have we already parsed the input?
if (_parsed)
return;
_parsed = true;
synchronized (this)
{
switch (state)
{
case UNPARSED:
state = State.PARSING;
break;
case PARSED:
return;
default:
_err = new IOException(state.name());
return;
}
}
MultiPartParser parser = null;
Handler handler = new Handler();
try
{
// if its not a multipart request, don't parse it
if (_contentType == null || !_contentType.startsWith("multipart/form-data"))
return;
// sort out the location to which to write the files
if (_config.getLocation() == null)
_tmpDir = _contextTmpDir;
@ -507,16 +556,23 @@ public class MultiPartFormInputStream
contentTypeBoundary = QuotedStringTokenizer.unquote(value(_contentType.substring(bstart, bend)).trim());
}
parser = new MultiPartParser(handler, contentTypeBoundary);
parser = new MultiPartParser(new Handler(), contentTypeBoundary);
byte[] data = new byte[_bufferSize];
int len;
long total = 0;
while (true)
{
synchronized (this)
{
if (state != State.PARSING)
{
_err = new IOException(state.name());
return;
}
}
len = _in.read(data);
if (len > 0)
{
// keep running total of size of bytes read from input and throw an exception if exceeds MultipartConfigElement._maxRequestSize
@ -570,6 +626,30 @@ public class MultiPartFormInputStream
if (parser != null)
parser.parse(BufferUtil.EMPTY_BUFFER, true);
}
finally
{
boolean cleanup = false;
synchronized (this)
{
switch (state)
{
case PARSING:
state = State.PARSED;
break;
case DELETING:
state = State.DELETED;
cleanup = true;
break;
default:
_err = new IllegalStateException(state.name());
}
}
if (cleanup)
delete();
}
}
class Handler implements MultiPartParser.Handler

View File

@ -25,6 +25,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Base64;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.MultipartConfigElement;
import javax.servlet.ReadListener;
@ -204,12 +206,10 @@ public class MultiPartFormInputStreamTest
throws Exception
{
MultipartConfigElement config = new MultipartConfigElement(_dirname, 1024, 3072, 50);
MultiPartFormInputStream mpis = new MultiPartFormInputStream(new ByteArrayInputStream(_multi.getBytes()),
"Content-type: text/plain",
config,
_tmpDir);
mpis.setDeleteOnExit(true);
assertTrue(mpis.getParts().isEmpty());
Throwable t = assertThrows(IllegalArgumentException.class, () ->
new MultiPartFormInputStream(new ByteArrayInputStream(_multi.getBytes()),
"Content-type: text/plain", config, _tmpDir));
assertThat(t.getMessage(), is("content type is not multipart/form-data"));
}
@Test
@ -518,6 +518,78 @@ public class MultiPartFormInputStreamTest
mpis.deleteParts(); // this should not be an NPE
}
@Test
public void testAsyncCleanUp() throws Exception
{
final CountDownLatch reading = new CountDownLatch(1);
final InputStream wrappedStream = new ByteArrayInputStream(createMultipartRequestString("myFile").getBytes());
// This stream won't allow the parser to exit because it will never return anything less than 0.
InputStream slowStream = new InputStream()
{
@Override
public int read(byte[] b, int off, int len) throws IOException
{
return Math.max(0, super.read(b, off, len));
}
@Override
public int read() throws IOException
{
reading.countDown();
return wrappedStream.read();
}
};
MultipartConfigElement config = new MultipartConfigElement(_dirname, 1024, 1024, 50);
MultiPartFormInputStream mpis = new MultiPartFormInputStream(slowStream, _contentType, config, _tmpDir);
// In another thread delete the parts when we detect that we have started parsing.
CompletableFuture<Throwable> cleanupError = new CompletableFuture<>();
new Thread(() ->
{
try
{
assertTrue(reading.await(5, TimeUnit.SECONDS));
mpis.deleteParts();
cleanupError.complete(null);
}
catch (Throwable t)
{
cleanupError.complete(t);
}
}).start();
// The call to getParts should throw an error.
Throwable error = assertThrows(IOException.class, mpis::getParts);
assertThat(error.getMessage(), is("DELETING"));
// There was no error with the cleanup.
assertNull(cleanupError.get());
// No tmp files are remaining.
String[] fileList = _tmpDir.list();
assertNotNull(fileList);
assertThat(fileList.length, is(0));
}
@Test
public void testParseAfterCleanUp() throws Exception
{
final InputStream input = new ByteArrayInputStream(createMultipartRequestString("myFile").getBytes());
MultipartConfigElement config = new MultipartConfigElement(_dirname, 1024, 1024, 50);
MultiPartFormInputStream mpis = new MultiPartFormInputStream(input, _contentType, config, _tmpDir);
mpis.deleteParts();
// The call to getParts should throw because we have already cleaned up the parts.
Throwable error = assertThrows(IOException.class, mpis::getParts);
assertThat(error.getMessage(), is("DELETED"));
// Even though we called getParts() we never even created the tmp directory as we had already called deleteParts().
assertFalse(_tmpDir.exists());
}
@Test
public void testLFOnlyRequest()

View File

@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -1077,7 +1078,7 @@ public class ServletContextHandler extends ContextHandler
throw new IllegalStateException();
if (StringUtil.isBlank(name))
throw new IllegalStateException("Missing name");
throw new IllegalArgumentException("Missing name");
if (!_enabled)
throw new UnsupportedOperationException();
@ -1287,9 +1288,20 @@ public class ServletContextHandler extends ContextHandler
return null; //existing completed registration for servlet name
}
@Override
public String getInitParameter(String name)
{
//since servlet spec 4.0
Objects.requireNonNull(name);
return super.getInitParameter(name);
}
@Override
public boolean setInitParameter(String name, String value)
{
//since servlet spec 4.0
Objects.requireNonNull(name);
if (!isStarting())
throw new IllegalStateException();

View File

@ -87,6 +87,7 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@ -97,6 +98,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -470,6 +472,32 @@ public class ServletContextHandlerTest
_server.join();
}
@Test
public void testInitParams() throws Exception
{
//Test get/setInitParam with null throws NPE
ServletContextHandler root = new ServletContextHandler(_server, "/", ServletContextHandler.SESSIONS);
_server.setHandler(root);
ListenerHolder initialListener = new ListenerHolder();
initialListener.setListener(new ServletContextListener()
{
public void contextInitialized(ServletContextEvent sce)
{
sce.getServletContext().setInitParameter("foo", "bar");
assertEquals("bar", sce.getServletContext().getInitParameter("foo"));
assertThrows(NullPointerException.class,
() -> sce.getServletContext().setInitParameter(null, "bad")
);
assertThrows(NullPointerException.class,
() -> sce.getServletContext().getInitParameter(null)
);
}
});
root.getServletHandler().addListener(initialListener);
_server.start();
}
@Test
public void testGetSetSessionTimeout() throws Exception
{
@ -1000,6 +1028,9 @@ public class ServletContextHandlerTest
}
}
assertThrows(IllegalArgumentException.class, () -> root.getServletContext().addJspFile(null, "/path/to/some.jsp"));
assertThrows(IllegalArgumentException.class, () -> root.getServletContext().addJspFile("", "/path/to/some.jsp"));
root.addBean(new MySCIStarter(root.getServletContext(), new JSPAddingSCI()), true);
_server.start();
MappedResource<ServletHolder> mappedServlet = root.getServletHandler().getMappedServlet("/somejsp/xxx");

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -138,6 +139,42 @@ public class FutureCallback implements Future<Void>, Callback
throw new ExecutionException(_cause);
}
public void block() throws IOException
{
block(-1, TimeUnit.SECONDS);
}
public void block(long timeout, TimeUnit unit) throws IOException
{
try
{
if (timeout > 0)
get(timeout, unit);
else
get();
}
catch (InterruptedException e)
{
InterruptedIOException exception = new InterruptedIOException();
exception.initCause(e);
throw exception;
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause instanceof RuntimeException)
throw (RuntimeException)cause;
else if (cause instanceof IOException)
throw (IOException)cause;
else
throw new IOException(cause);
}
catch (TimeoutException e)
{
throw new IOException(e);
}
}
public static void rethrow(ExecutionException e) throws IOException
{
Throwable cause = e.getCause();

View File

@ -241,16 +241,19 @@ public interface CoreSession extends OutgoingFrames, Configuration
@Override
public void flush(Callback callback)
{
callback.succeeded();
}
@Override
public void close(Callback callback)
{
callback.succeeded();
}
@Override
public void close(int statusCode, String reason, Callback callback)
{
callback.succeeded();
}
@Override
@ -261,6 +264,7 @@ public interface CoreSession extends OutgoingFrames, Configuration
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
callback.succeeded();
}
}
}

View File

@ -142,7 +142,7 @@ public class WebSocketSessionState
}
}
public boolean onOutgoingFrame(Frame frame) throws ProtocolException
public boolean onOutgoingFrame(Frame frame) throws Exception
{
byte opcode = frame.getOpCode();
boolean fin = frame.isFin();
@ -150,7 +150,7 @@ public class WebSocketSessionState
synchronized (this)
{
if (!isOutputOpen())
throw new IllegalStateException(_sessionState.toString());
throw new ClosedChannelException();
if (opcode == OpCode.CLOSE)
{

View File

@ -0,0 +1,135 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FlushTest
{
private WebSocketServer server;
private TestFrameHandler serverHandler = new TestFrameHandler();
private WebSocketCoreClient client;
private WebSocketComponents components = new WebSocketComponents();
@BeforeEach
public void startup() throws Exception
{
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverHandler);
server = new WebSocketServer(negotiator);
client = new WebSocketCoreClient(null, components);
server.start();
client.start();
}
@AfterEach
public void shutdown() throws Exception
{
server.stop();
client.stop();
}
@Test
public void testStandardFlush() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, server.getUri());
connect.get(5, TimeUnit.SECONDS);
// Send a batched frame.
clientHandler.sendFrame(new Frame(OpCode.TEXT, "text payload"), Callback.NOOP, true);
// We have batched the frame and not sent it.
assertNull(serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS));
// Once we flush the frame is received.
clientHandler.getCoreSession().flush(Callback.NOOP);
Frame frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("text payload"));
clientHandler.sendClose();
frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NO_CODE));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertNull(clientHandler.getError());
assertThat(clientHandler.closeStatus.getCode(), is(CloseStatus.NO_CODE));
}
@Test
public void testFlushOnCloseFrame() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, server.getUri());
connect.get(5, TimeUnit.SECONDS);
// Send a batched frame.
clientHandler.sendFrame(new Frame(OpCode.TEXT, "text payload"), Callback.NOOP, true);
// We have batched the frame and not sent it.
assertNull(serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS));
// Sending the close initiates the flush and the frame is received.
clientHandler.sendClose();
Frame frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("text payload"));
frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NO_CODE));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertNull(clientHandler.getError());
assertThat(clientHandler.closeStatus.getCode(), is(CloseStatus.NO_CODE));
}
@Test
public void testFlushAfterClose() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, server.getUri());
connect.get(5, TimeUnit.SECONDS);
clientHandler.sendClose();
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertNull(clientHandler.getError());
Callback.Completable flushCallback = new Callback.Completable();
clientHandler.getCoreSession().flush(flushCallback);
ExecutionException e = assertThrows(ExecutionException.class, () -> flushCallback.get(5, TimeUnit.SECONDS));
assertThat(e.getCause(), instanceOf(ClosedChannelException.class));
}
}

View File

@ -19,9 +19,12 @@
package org.eclipse.jetty.websocket.core;
import java.net.Socket;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.NetworkConnector;
@ -48,10 +51,13 @@ import org.junit.jupiter.params.provider.ValueSource;
import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -425,6 +431,97 @@ public class WebSocketCloseTest extends WebSocketTester
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
}
@ParameterizedTest
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
public void doubleNormalClose(String scheme) throws Exception
{
setup(State.OPEN, scheme);
Callback.Completable callback1 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal 1", callback1);
Callback.Completable callback2 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal 2", callback2);
// First Callback Succeeded
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
// Second Callback Failed with ClosedChannelException
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
// Normal close frame received on client.
Frame closeFrame = receiveFrame(client.getInputStream());
assertThat(closeFrame.getOpCode(), is(OpCode.CLOSE));
CloseStatus closeStatus = CloseStatus.getCloseStatus(closeFrame);
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(closeStatus.getReason(), is("normal 1"));
// Send close response from client.
client.getOutputStream().write(RawFrameBuilder.buildClose(
new CloseStatus(CloseStatus.NORMAL, "normal response 1"), true));
server.handler.getCoreSession().demand(1);
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
Callback closeFrameCallback = Objects.requireNonNull(server.handler.receivedCallback.poll());
closeFrameCallback.succeeded();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(server.handler.closeStatus.getReason(), is("normal response 1"));
}
@ParameterizedTest
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
public void doubleAbnormalClose(String scheme) throws Exception
{
setup(State.OPEN, scheme);
Callback.Completable callback1 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.SERVER_ERROR, "server error should succeed", callback1);
Callback.Completable callback2 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.PROTOCOL, "protocol error should fail", callback2);
// First Callback Succeeded
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
// Second Callback Failed with ClosedChannelException
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getReason(), containsString("server error should succeed"));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
}
@ParameterizedTest
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
public void doubleCloseAbnormalOvertakesNormalClose(String scheme) throws Exception
{
setup(State.OPEN, scheme);
Callback.Completable callback1 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal close (client does not complete close handshake)", callback1);
Callback.Completable callback2 = new Callback.Completable();
server.handler.getCoreSession().close(CloseStatus.SERVER_ERROR, "error close should overtake normal close", callback2);
// First Callback Succeeded
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
// Second Callback Failed with ClosedChannelException
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(server.handler.closeStatus.getReason(), containsString("error close should overtake normal close"));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NORMAL));
}
static class DemandingTestFrameHandler implements SynchronousFrameHandler
{
private CoreSession coreSession;

View File

@ -47,6 +47,7 @@ public class WebSocketTester
private static String NON_RANDOM_KEY = Base64.getEncoder().encodeToString("0123456701234567".getBytes());
private static SslContextFactory.Client sslContextFactory;
protected ByteBufferPool bufferPool;
protected ByteBuffer buffer;
protected Parser parser;
@BeforeAll
@ -159,33 +160,34 @@ public class WebSocketTester
protected Parser.ParsedFrame receiveFrame(InputStream in) throws IOException
{
ByteBuffer buffer = bufferPool.acquire(4096, false);
if (buffer == null)
buffer = bufferPool.acquire(4096, false);
while (true)
{
Parser.ParsedFrame frame = parser.parse(buffer);
if (!buffer.hasRemaining())
BufferUtil.clear(buffer);
if (frame != null)
return frame;
int p = BufferUtil.flipToFill(buffer);
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
if (len < 0)
return null;
buffer.position(buffer.position() + len);
BufferUtil.flipToFlush(buffer, p);
Parser.ParsedFrame frame = parser.parse(buffer);
if (frame != null)
return frame;
}
}
protected void receiveEof(InputStream in) throws IOException
{
ByteBuffer buffer = bufferPool.acquire(4096, false);
while (true)
{
BufferUtil.flipToFill(buffer);
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
if (len < 0)
return;
BufferUtil.clearToFill(buffer);
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
if (len < 0)
return;
throw new IllegalStateException("unexpected content");
}
throw new IllegalStateException("unexpected content");
}
}

View File

@ -22,11 +22,12 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import javax.websocket.EncodeException;
import javax.websocket.RemoteEndpoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -65,10 +66,10 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
{
LOG.debug("sendBinary({})", BufferUtil.toDetailString(data));
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
sendFrame(new Frame(OpCode.BINARY).setPayload(data), b, false);
}
FutureCallback b = new FutureCallback();
sendFrame(new Frame(OpCode.BINARY).setPayload(data), b, false);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -79,37 +80,36 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
{
LOG.debug("sendBinary({},{})", BufferUtil.toDetailString(partialByte), isLast);
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.BINARY);
break;
case OpCode.TEXT:
throw new IllegalStateException("Cannot send a partial BINARY message: TEXT message in progress");
case OpCode.BINARY:
frame = new Frame(OpCode.CONTINUATION);
break;
default:
throw new IllegalStateException("Cannot send a partial BINARY message: unrecognized active message type " + OpCode.name(messageType));
}
frame.setPayload(partialByte);
frame.setFin(isLast);
sendFrame(frame, b, false);
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.BINARY);
break;
case OpCode.TEXT:
throw new IllegalStateException("Cannot send a partial BINARY message: TEXT message in progress");
case OpCode.BINARY:
frame = new Frame(OpCode.CONTINUATION);
break;
default:
throw new IllegalStateException("Cannot send a partial BINARY message: unrecognized active message type " + OpCode.name(messageType));
}
frame.setPayload(partialByte);
frame.setFin(isLast);
FutureCallback b = new FutureCallback();
sendFrame(frame, b, false);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public void sendObject(Object data) throws IOException, EncodeException
{
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
super.sendObject(data, b);
}
FutureCallback b = new FutureCallback();
super.sendObject(data, b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -120,10 +120,11 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
{
LOG.debug("sendText({})", TextUtil.hint(text));
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
sendFrame(new Frame(OpCode.TEXT).setPayload(text), b, false);
}
FutureCallback b = new FutureCallback();
sendFrame(new Frame(OpCode.TEXT).setPayload(text), b, false);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -134,27 +135,33 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
{
LOG.debug("sendText({},{})", TextUtil.hint(partialMessage), isLast);
}
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
{
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.TEXT);
break;
case OpCode.TEXT:
frame = new Frame(OpCode.CONTINUATION);
break;
case OpCode.BINARY:
throw new IllegalStateException("Cannot send a partial TEXT message: BINARY message in progress");
default:
throw new IllegalStateException("Cannot send a partial TEXT message: unrecognized active message type " + OpCode.name(messageType));
}
frame.setPayload(BufferUtil.toBuffer(partialMessage, UTF_8));
frame.setFin(isLast);
sendFrame(frame, b, false);
Frame frame;
switch (messageType)
{
case -1:
// New message!
frame = new Frame(OpCode.TEXT);
break;
case OpCode.TEXT:
frame = new Frame(OpCode.CONTINUATION);
break;
case OpCode.BINARY:
throw new IllegalStateException("Cannot send a partial TEXT message: BINARY message in progress");
default:
throw new IllegalStateException("Cannot send a partial TEXT message: unrecognized active message type " + OpCode.name(messageType));
}
frame.setPayload(BufferUtil.toBuffer(partialMessage, UTF_8));
frame.setFin(isLast);
FutureCallback b = new FutureCallback();
sendFrame(frame, b, false);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
private long getBlockingTimeout()
{
long idleTimeout = getIdleTimeout();
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
}
}

View File

@ -21,13 +21,14 @@ package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.SendHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -66,10 +67,9 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
@Override
public void flushBatch() throws IOException
{
try (SharedBlockingCallback.Blocker blocker = session.getBlocking().acquire())
{
coreSession.flush(blocker);
}
FutureCallback b = new FutureCallback();
coreSession.flush(b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -227,24 +227,22 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
public void sendPing(ByteBuffer data) throws IOException, IllegalArgumentException
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPing({})", BufferUtil.toDetailString(data));
}
// TODO: is this supposed to be a blocking call?
// TODO: what to do on excessively large payloads (error and close connection per RFC6455, or truncate?)
sendFrame(new Frame(OpCode.PING).setPayload(data), Callback.NOOP, batch);
FutureCallback b = new FutureCallback();
sendFrame(new Frame(OpCode.PING).setPayload(data), b, batch);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public void sendPong(ByteBuffer data) throws IOException, IllegalArgumentException
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPong({})", BufferUtil.toDetailString(data));
}
// TODO: is this supposed to be a blocking call?
// TODO: what to do on excessively large payloads (error and close connection per RFC6455, or truncate?)
sendFrame(new Frame(OpCode.PONG).setPayload(data), Callback.NOOP, batch);
FutureCallback b = new FutureCallback();
sendFrame(new Frame(OpCode.PONG).setPayload(data), b, batch);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
protected void assertMessageNotNull(Object data)
@ -262,4 +260,10 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
throw new IllegalArgumentException("SendHandler cannot be null");
}
}
private long getBlockingTimeout()
{
long idleTimeout = getIdleTimeout();
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
}
}

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
@ -38,7 +39,7 @@ import javax.websocket.RemoteEndpoint.Basic;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -54,7 +55,6 @@ public class JavaxWebSocketSession implements javax.websocket.Session
{
private static final Logger LOG = Log.getLogger(JavaxWebSocketSession.class);
protected final SharedBlockingCallback blocking = new SharedBlockingCallback();
private final JavaxWebSocketContainer container;
private final CoreSession coreSession;
private final JavaxWebSocketFrameHandler frameHandler;
@ -179,10 +179,7 @@ public class JavaxWebSocketSession implements javax.websocket.Session
@Override
public void close() throws IOException
{
try (SharedBlockingCallback.Blocker blocker = blocking.acquire())
{
coreSession.close(blocker);
}
close(new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, null));
}
/**
@ -194,10 +191,15 @@ public class JavaxWebSocketSession implements javax.websocket.Session
@Override
public void close(CloseReason closeReason) throws IOException
{
try (SharedBlockingCallback.Blocker blocker = blocking.acquire())
{
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), blocker);
}
FutureCallback b = new FutureCallback();
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
private long getBlockingTimeout()
{
long idleTimeout = getMaxIdleTimeout();
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
}
/**
@ -565,9 +567,4 @@ public class JavaxWebSocketSession implements javax.websocket.Session
return String.format("%s@%x[%s,%s]", this.getClass().getSimpleName(), this.hashCode(),
coreSession.getBehavior(), frameHandler);
}
protected SharedBlockingCallback getBlocking()
{
return blocking;
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.javax.common;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@ -25,9 +26,13 @@ import javax.websocket.CloseReason;
import javax.websocket.Session;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener
{
private static final Logger LOG = Log.getLogger(SessionTracker.class);
private CopyOnWriteArraySet<JavaxWebSocketSession> sessions = new CopyOnWriteArraySet<>();
public Set<Session> getSessions()
@ -52,8 +57,15 @@ public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketS
{
for (Session session : sessions)
{
// GOING_AWAY is abnormal close status so it will hard close connection after sent.
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
try
{
// GOING_AWAY is abnormal close status so it will hard close connection after sent.
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
}
catch (IOException e)
{
LOG.ignore(e);
}
}
super.doStop();

View File

@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class SessionTrackingTest
{
static BlockingArrayQueue<Session> serverSessions = new BlockingArrayQueue<>();
private static BlockingArrayQueue<Session> serverSessions = new BlockingArrayQueue<>();
@ServerEndpoint("/session-info/{sessionId}")
public static class SessionTrackingSocket
@ -104,59 +104,53 @@ public class SessionTrackingTest
EventSocket clientSocket2 = new EventSocket();
EventSocket clientSocket3 = new EventSocket();
try (Session session1 = client.connectToServer(clientSocket1, server.getWsUri().resolve("/session-info/1")))
{
Session serverSession1 = serverSessions.poll(5, TimeUnit.SECONDS);
assertNotNull(serverSession1);
sendTextFrameToAll("openSessions|in-1", session1);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-1).size=1"));
Session session1 = client.connectToServer(clientSocket1, server.getWsUri().resolve("/session-info/1"));
Session serverSession1 = serverSessions.poll(5, TimeUnit.SECONDS);
assertNotNull(serverSession1);
sendTextFrameToAll("openSessions|in-1", session1);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-1).size=1"));
try (Session session2 = client.connectToServer(clientSocket2, server.getWsUri().resolve("/session-info/2")))
{
Session serverSession2 = serverSessions.poll(5, TimeUnit.SECONDS);
assertNotNull(serverSession2);
sendTextFrameToAll("openSessions|in-2", session1, session2);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
Session session2 = client.connectToServer(clientSocket2, server.getWsUri().resolve("/session-info/2"));
Session serverSession2 = serverSessions.poll(5, TimeUnit.SECONDS);
assertNotNull(serverSession2);
sendTextFrameToAll("openSessions|in-2", session1, session2);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
try (Session session3 = client.connectToServer(clientSocket3, server.getWsUri().resolve("/session-info/3")))
{
Session serverSession3 = serverSessions.poll(5, TimeUnit.SECONDS);
assertNotNull(serverSession3);
sendTextFrameToAll("openSessions|in-3", session1, session2, session3);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
Session session3 = client.connectToServer(clientSocket3, server.getWsUri().resolve("/session-info/3"));
Session serverSession3 = serverSessions.poll(5, TimeUnit.SECONDS);
assertNotNull(serverSession3);
sendTextFrameToAll("openSessions|in-3", session1, session2, session3);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
sendTextFrameToAll("openSessions|lvl-3", session1, session2, session3);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
sendTextFrameToAll("openSessions|lvl-3", session1, session2, session3);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
// assert session is closed, and we have received the notification from the SessionListener
session3.close();
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession3));
assertTrue(clientSocket3.closeLatch.await(5, TimeUnit.SECONDS));
}
// assert session is closed, and we have received the notification from the SessionListener
session3.close();
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession3));
assertTrue(clientSocket3.closeLatch.await(5, TimeUnit.SECONDS));
sendTextFrameToAll("openSessions|lvl-2", session1, session2);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
sendTextFrameToAll("openSessions|lvl-2", session1, session2);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
// assert session is closed, and we have received the notification from the SessionListener
session2.close();
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession2));
assertTrue(clientSocket2.closeLatch.await(5, TimeUnit.SECONDS));
}
// assert session is closed, and we have received the notification from the SessionListener
session2.close();
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession2));
assertTrue(clientSocket2.closeLatch.await(5, TimeUnit.SECONDS));
sendTextFrameToAll("openSessions|lvl-1", session1);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-1).size=1"));
sendTextFrameToAll("openSessions|lvl-1", session1);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-1).size=1"));
// assert session is closed, and we have received the notification from the SessionListener
session1.close();
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession1));
assertTrue(clientSocket1.closeLatch.await(5, TimeUnit.SECONDS));
}
// assert session is closed, and we have received the notification from the SessionListener
session1.close();
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession1));
assertTrue(clientSocket1.closeLatch.await(5, TimeUnit.SECONDS));
}
private static void sendTextFrameToAll(String msg, Session... sessions) throws IOException

View File

@ -22,27 +22,32 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.ProtocolException;
import static java.nio.charset.StandardCharsets.UTF_8;
public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket.api.RemoteEndpoint
{
private final CoreSession coreSession;
private static final Logger LOG = Log.getLogger(JettyWebSocketRemoteEndpoint.class);
private final FrameHandler.CoreSession coreSession;
private byte messageType = -1;
private final SharedBlockingCallback blocker = new SharedBlockingCallback();
private BatchMode batchMode;
public JettyWebSocketRemoteEndpoint(CoreSession coreSession, BatchMode batchMode)
public JettyWebSocketRemoteEndpoint(FrameHandler.CoreSession coreSession, BatchMode batchMode)
{
this.coreSession = Objects.requireNonNull(coreSession);
this.batchMode = batchMode;
@ -55,14 +60,7 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
*/
public void close()
{
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
coreSession.close(b);
}
catch (IOException e)
{
coreSession.close(Callback.NOOP);
}
close(StatusCode.NO_CODE, null);
}
/**
@ -74,13 +72,15 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
*/
public void close(int statusCode, String reason)
{
try (SharedBlockingCallback.Blocker b = blocker.acquire())
try
{
FutureCallback b = new FutureCallback();
coreSession.close(statusCode, reason, b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
catch (IOException e)
{
coreSession.close(Callback.NOOP);
LOG.ignore(e);
}
}
@ -114,11 +114,9 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
@Override
public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
{
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
sendPartialBytes(fragment, isLast, b);
b.block();
}
FutureCallback b = new FutureCallback();
sendPartialBytes(fragment, isLast, b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -158,11 +156,9 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
@Override
public void sendPartialString(String fragment, boolean isLast) throws IOException
{
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
sendPartialText(fragment, isLast, b);
b.block();
}
FutureCallback b = new FutureCallback();
sendPartialText(fragment, isLast, b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -227,16 +223,9 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
private void sendBlocking(Frame frame) throws IOException
{
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
coreSession.sendFrame(frame, b, false);
b.block();
}
}
protected CoreSession getCoreSession()
{
return coreSession;
FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -265,10 +254,14 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
@Override
public void flush() throws IOException
{
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
coreSession.flush(b);
b.block();
}
FutureCallback b = new FutureCallback();
coreSession.flush(b);
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
}
private long getBlockingTimeout()
{
long idleTimeout = coreSession.getIdleTimeout().toMillis();
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
}
}

View File

@ -183,7 +183,7 @@ public class WebSocketSession implements Session, SuspendToken, Dumpable
@Override
public boolean isOpen()
{
return remoteEndpoint.getCoreSession().isOutputOpen();
return coreSession.isOutputOpen();
}
@Override

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
@ -125,8 +126,7 @@ public class WebSocketStopTest
assertThat(clientSocket.statusCode, is(StatusCode.NORMAL));
assertThat(serverSocket.statusCode, is(StatusCode.NORMAL));
IllegalStateException failure = assertThrows(IllegalStateException.class,
assertThrows(ClosedChannelException.class,
() -> session.getRemote().sendString("this should fail before ExtensionStack"));
assertThat(failure.getMessage(), is("CLOSED"));
}
}

View File

@ -65,6 +65,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -285,6 +286,29 @@ public class ClientCloseTest
clientSessionTracker.assertClosedProperly(client);
}
@Test
public void testDoubleClose() throws Exception
{
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1);
clientSessionTracker.addTo(client);
// Client connects
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint();
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
// Client confirms connection via echo.
confirmConnection(clientSocket, clientConnectFuture);
// Close twice, first close should succeed and second close is a NOOP
clientSocket.getSession().close(StatusCode.NORMAL, "close1");
clientSocket.getSession().close(StatusCode.NO_CODE, "close2");
// Second close is ignored, we are notified of the first close.
clientSocket.assertReceivedCloseEvent(5000, is(StatusCode.NORMAL), containsString("close1"));
assertNull(clientSocket.error.get());
}
@Test
public void testStopLifecycle() throws Exception
{

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.hamcrest.Matcher;
@ -34,6 +35,7 @@ import static org.hamcrest.Matchers.nullValue;
public abstract class AbstractCloseEndpoint extends WebSocketAdapter
{
public final Logger log;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public String closeReason = null;
public int closeStatusCode = -1;
@ -44,6 +46,14 @@ public abstract class AbstractCloseEndpoint extends WebSocketAdapter
this.log = Log.getLogger(this.getClass().getName());
}
@Override
public void onWebSocketConnect(Session sess)
{
super.onWebSocketConnect(sess);
log.debug("onWebSocketConnect({})", sess);
openLatch.countDown();
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{

View File

@ -0,0 +1,31 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.server;
import org.eclipse.jetty.websocket.api.StatusCode;
public class CloseInOnCloseEndpoint extends AbstractCloseEndpoint
{
@Override
public void onWebSocketClose(int statusCode, String reason)
{
getSession().close(StatusCode.SERVER_ERROR, "this should be a noop");
super.onWebSocketClose(statusCode, reason);
}
}

View File

@ -0,0 +1,47 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.server;
import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.websocket.api.StatusCode;
public class CloseInOnCloseEndpointNewThread extends AbstractCloseEndpoint
{
@Override
public void onWebSocketClose(int statusCode, String reason)
{
try
{
CountDownLatch complete = new CountDownLatch(1);
new Thread(() ->
{
getSession().close(StatusCode.SERVER_ERROR, "this should be a noop");
complete.countDown();
}).start();
complete.await();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
super.onWebSocketClose(statusCode, reason);
}
}

View File

@ -63,6 +63,16 @@ public class ServerCloseCreator implements JettyWebSocketCreator
closeSocket = new ContainerEndpoint(container);
resp.setAcceptedSubProtocol("container");
}
else if (req.hasSubProtocol("closeInOnClose"))
{
closeSocket = new CloseInOnCloseEndpoint();
resp.setAcceptedSubProtocol("closeInOnClose");
}
else if (req.hasSubProtocol("closeInOnCloseNewThread"))
{
closeSocket = new CloseInOnCloseEndpointNewThread();
resp.setAcceptedSubProtocol("closeInOnCloseNewThread");
}
if (closeSocket != null)
{

View File

@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests various close scenarios
@ -275,4 +276,52 @@ public class ServerCloseTest
close(session);
}
}
@Test
public void testSecondCloseFromOnClosed() throws Exception
{
// Testing WebSocketSession.close() in onClosed() does not cause deadlock.
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols("closeInOnClose");
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
client.connect(clientEndpoint, wsUri, request).get(5, SECONDS);
// Hard close from the server. Server onClosed() will try to close again which should be a NOOP.
AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated();
assertTrue(serverEndpoint.openLatch.await(5, SECONDS));
serverEndpoint.getSession().close(StatusCode.SHUTDOWN, "SHUTDOWN hard close");
// Verify that client got close
clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.SHUTDOWN), containsString("SHUTDOWN hard close"));
// Verify that server socket got close event
assertTrue(serverEndpoint.closeLatch.await(5, SECONDS));
assertThat(serverEndpoint.closeStatusCode, is(StatusCode.SHUTDOWN));
}
@Test
public void testSecondCloseFromOnClosedInNewThread() throws Exception
{
// Testing WebSocketSession.close() in onClosed() does not cause deadlock.
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols("closeInOnCloseNewThread");
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
client.connect(clientEndpoint, wsUri, request).get(5, SECONDS);
// Hard close from the server. Server onClosed() will try to close again which should be a NOOP.
AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated();
assertTrue(serverEndpoint.openLatch.await(5, SECONDS));
serverEndpoint.getSession().close(StatusCode.SHUTDOWN, "SHUTDOWN hard close");
// Verify that client got close
clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.SHUTDOWN), containsString("SHUTDOWN hard close"));
// Verify that server socket got close event
assertTrue(serverEndpoint.closeLatch.await(5, SECONDS));
assertThat(serverEndpoint.closeStatusCode, is(StatusCode.SHUTDOWN));
}
}