Issue #207 - prevent overlapping websocket streaming dispatch

+ when FIN==true, wait for dispatch to return before processing
  more frames
This commit is contained in:
Joakim Erdfelt 2016-10-13 14:42:08 -07:00
parent f920c65bcc
commit a0b7bc4623
4 changed files with 356 additions and 20 deletions

View File

@ -0,0 +1,139 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.function;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.function.EndpointFunctions;
import org.eclipse.jetty.websocket.common.test.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.ConfiguredEndpoint;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.jsr356.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.jsr356.encoders.AvailableEncoders;
import org.eclipse.jetty.websocket.jsr356.endpoints.TrackingSocket;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class JsrEndpointFunctions_OnMessage_BinaryStreamTest
{
private static ClientContainer container;
@BeforeClass
public static void initContainer()
{
container = new ClientContainer();
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
private AvailableEncoders encoders;
private AvailableDecoders decoders;
private Map<String, String> uriParams = new HashMap<>();
private EndpointConfig endpointConfig;
public JsrEndpointFunctions_OnMessage_BinaryStreamTest()
{
endpointConfig = new EmptyClientEndpointConfig();
encoders = new AvailableEncoders(endpointConfig);
decoders = new AvailableDecoders(endpointConfig);
uriParams = new HashMap<>();
}
public JsrSession newSession(Object websocket)
{
String id = JsrEndpointFunctions_OnMessage_BinaryStreamTest.class.getSimpleName();
URI requestURI = URI.create("ws://localhost/" + id);
WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection);
}
@SuppressWarnings("Duplicates")
private TrackingSocket performOnMessageInvocation(TrackingSocket socket, Function<EndpointFunctions, Void> func) throws Exception
{
// Establish endpoint function
JsrEndpointFunctions endpointFunctions = new JsrEndpointFunctions(
socket, container.getPolicy(),
container.getExecutor(),
encoders,
decoders,
uriParams,
endpointConfig
);
endpointFunctions.start();
// This invocation is the same for all tests
endpointFunctions.onOpen(newSession(socket));
func.apply(endpointFunctions);
return socket;
}
@ClientEndpoint
public static class MessageStreamSocket extends TrackingSocket
{
@OnMessage
public void onMessage(InputStream stream)
{
try
{
String msg = IO.toString(stream, StandardCharsets.UTF_8);
addEvent("onMessage(%s) = \"%s\"", stream.getClass().getSimpleName(), msg);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
@Test
public void testInvokeMessageText() throws Exception
{
TrackingSocket socket = performOnMessageInvocation(new MessageStreamSocket(), (endpoint) ->
{
endpoint.onBinary(BufferUtil.toBuffer("Hello World", StandardCharsets.UTF_8), true);
return null;
});
socket.assertEvent("onMessage(MessageInputStream) = \"Hello World\"");
}
}

View File

@ -0,0 +1,139 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.function;
import java.io.IOException;
import java.io.Reader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.function.EndpointFunctions;
import org.eclipse.jetty.websocket.common.test.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.ConfiguredEndpoint;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.client.EmptyClientEndpointConfig;
import org.eclipse.jetty.websocket.jsr356.decoders.AvailableDecoders;
import org.eclipse.jetty.websocket.jsr356.encoders.AvailableEncoders;
import org.eclipse.jetty.websocket.jsr356.endpoints.TrackingSocket;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class JsrEndpointFunctions_OnMessage_TextStreamTest
{
private static ClientContainer container;
@BeforeClass
public static void initContainer()
{
container = new ClientContainer();
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
private AvailableEncoders encoders;
private AvailableDecoders decoders;
private Map<String, String> uriParams = new HashMap<>();
private EndpointConfig endpointConfig;
public JsrEndpointFunctions_OnMessage_TextStreamTest()
{
endpointConfig = new EmptyClientEndpointConfig();
encoders = new AvailableEncoders(endpointConfig);
decoders = new AvailableDecoders(endpointConfig);
uriParams = new HashMap<>();
}
public JsrSession newSession(Object websocket)
{
String id = JsrEndpointFunctions_OnMessage_TextStreamTest.class.getSimpleName();
URI requestURI = URI.create("ws://localhost/" + id);
WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
DummyConnection connection = new DummyConnection(policy);
ClientEndpointConfig config = new EmptyClientEndpointConfig();
ConfiguredEndpoint ei = new ConfiguredEndpoint(websocket, config);
return new JsrSession(container, id, requestURI, ei, policy, connection);
}
@SuppressWarnings("Duplicates")
private TrackingSocket performOnMessageInvocation(TrackingSocket socket, Function<EndpointFunctions, Void> func) throws Exception
{
// Establish endpoint function
JsrEndpointFunctions endpointFunctions = new JsrEndpointFunctions(
socket, container.getPolicy(),
container.getExecutor(),
encoders,
decoders,
uriParams,
endpointConfig
);
endpointFunctions.start();
// This invocation is the same for all tests
endpointFunctions.onOpen(newSession(socket));
func.apply(endpointFunctions);
return socket;
}
@ClientEndpoint
public static class MessageStreamSocket extends TrackingSocket
{
@OnMessage
public void onMessage(Reader stream)
{
try
{
String msg = IO.toString(stream);
addEvent("onMessage(%s) = \"%s\"", stream.getClass().getSimpleName(), msg);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
@Test
public void testInvokeMessageText() throws Exception
{
TrackingSocket socket = performOnMessageInvocation(new MessageStreamSocket(), (endpoint) ->
{
endpoint.onText(BufferUtil.toBuffer("Hello World", StandardCharsets.UTF_8), true);
return null;
});
socket.assertEvent("onMessage(MessageReader) = \"Hello World\"");
}
}

View File

@ -20,14 +20,21 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class InputStreamMessageSink implements MessageSink
{
private static final Logger LOG = Log.getLogger(ReaderMessageSink.class);
private final Function<InputStream, Void> onStreamFunction;
private final Executor executor;
private MessageInputStream stream;
private CountDownLatch dispatchCompleted = new CountDownLatch(1);
public InputStreamMessageSink(Executor executor, Function<InputStream, Void> function)
{
@ -51,17 +58,44 @@ public class InputStreamMessageSink implements MessageSink
stream.accept(payload,fin);
if (first)
{
dispatchCompleted = new CountDownLatch(1);
executor.execute(() -> {
// processing of errors is the responsibility
// of the stream function
onStreamFunction.apply(stream);
final MessageInputStream dispatchedStream = stream;
try
{
onStreamFunction.apply(dispatchedStream);
}
catch (Throwable t)
{
// processing of errors is the responsibility
// of the stream function
if (LOG.isDebugEnabled())
{
LOG.debug("Unhandled throwable", t);
}
}
// Returned from dispatch, stream should be closed
IO.close(dispatchedStream);
dispatchCompleted.countDown();
});
}
}
finally
{
//noinspection Duplicates
if (fin)
{
if (LOG.isDebugEnabled())
LOG.debug("dispatch complete await() - {}", stream);
try
{
dispatchCompleted.await();
}
catch (InterruptedException e)
{
if (LOG.isDebugEnabled())
LOG.debug(e);
}
stream = null;
}
}

View File

@ -20,59 +20,83 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class ReaderMessageSink implements MessageSink
{
private static final Logger LOG = Log.getLogger(ReaderMessageSink.class);
private final Executor executor;
private final Function<Reader, Void> onStreamFunction;
private final Executor executor;
private MessageReader stream;
private CountDownLatch dispatchCompleted = new CountDownLatch(1);
public ReaderMessageSink(Executor executor, Function<Reader, Void> function)
{
this.executor = executor;
this.onStreamFunction = function;
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
{
try
{
boolean first = false;
if (stream == null)
{
stream = new MessageReader(new MessageInputStream());
first = true;
}
stream.accept(payload,fin);
stream.accept(payload, fin);
if (first)
{
executor.execute(() -> {
// processing of errors is the responsibility
// of the stream function
if(LOG.isDebugEnabled())
LOG.debug("onStreamFunction.apply({})", stream);
onStreamFunction.apply(stream);
dispatchCompleted = new CountDownLatch(1);
executor.execute(() ->
{
final MessageReader dispatchedStream = stream;
try
{
onStreamFunction.apply(dispatchedStream);
}
catch (Throwable t)
{
// processing of errors is the responsibility
// of the stream function
if (LOG.isDebugEnabled())
{
LOG.debug("Unhandled throwable", t);
}
}
// Returned from dispatch, stream should be closed
IO.close(dispatchedStream);
dispatchCompleted.countDown();
});
}
}
finally
{
//noinspection Duplicates
if (fin)
{
if(LOG.isDebugEnabled())
LOG.debug("stream.awaitClose() - {}", stream);
stream.awaitClose();
if(LOG.isDebugEnabled())
LOG.debug("stream recycled - {}", stream);
if (LOG.isDebugEnabled())
LOG.debug("dispatch complete await() - {}", stream);
try
{
dispatchCompleted.await();
}
catch (InterruptedException e)
{
if (LOG.isDebugEnabled())
LOG.debug(e);
}
stream = null;
}
}