Issue #207 - cleaning up dispatched MessageSink threading

This commit is contained in:
Joakim Erdfelt 2017-05-09 12:20:46 -07:00
parent d0c7f822f7
commit 4bae04b623
7 changed files with 194 additions and 156 deletions

View File

@ -261,12 +261,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
if(LOG.isDebugEnabled())
LOG.debug("onFrame({}).succeed()", frame);
parser.release(frame);
if(!result.compareAndSet(false,true))
{
// callback has been notified asynchronously
fillAndParse();
// fillInterested();
}
}

View File

@ -21,8 +21,6 @@ package org.eclipse.jetty.websocket.common.io;
import java.util.concurrent.Future;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WriteCallback;
/**
@ -30,8 +28,6 @@ import org.eclipse.jetty.websocket.api.WriteCallback;
*/
public class FutureWriteCallback extends FutureCallback implements WriteCallback
{
private static final Logger LOG = Log.getLogger(FutureWriteCallback.class);
@Override
public void writeFailed(Throwable cause)
{

View File

@ -0,0 +1,167 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
/**
* Centralized logic for Dispatched Message Handling.
* <p>
* A Dispatched MessageSink can consist of 1 or more {@link #accept(Frame, FrameCallback)} calls.
* <p>
* The first {@link #accept(Frame, FrameCallback)} in a message will trigger a dispatch to the
* function specified in the constructor.
* <p>
* The completion of the dispatched function call is the sign that the next message is suitable
* for processing from the network. (The connection fillAndParse should remain idle for the
* NEXT message until such time as the dispatched function call has completed)
* </p>
* <p>
* There are a few use cases we need to handle.
* </p>
* <p>
* <em>1. Normal Processing</em>
* </p>
* <pre>
* Connection Thread | DispatchedMessageSink | Thread 2
* TEXT > accept
* - dispatch - function.read(stream)
* CONT > accept stream.read()
* CONT > accept stream.read()
* CONT=fin > accept stream.read()
* EOF stream.read EOF
* IDLE
* exit method
* RESUME(NEXT MSG)
* </pre>
* <p>
* <em>2. Early Exit (with no activity)</em>
* </p>
* <pre>
* Connection Thread | DispatchedMessageSink | Thread 2
* TEXT > accept
* - dispatch - function.read(stream)
* CONT > accept exit method (normal return)
* IDLE
* TIMEOUT
* </pre>
* <p>
* <em>3. Early Exit (due to exception)</em>
* </p>
* <pre>
* Connection Thread | DispatchedMessageSink | Thread 2
* TEXT > accept
* - dispatch - function.read(stream)
* CONT > accept exit method (throwable)
* callback.fail()
* endpoint.onError()
* close(error)
* </pre>
* <p>
* <em>4. Early Exit (with Custom Threading)</em>
* </p>
* <pre>
* Connection Thread | DispatchedMessageSink | Thread 2 | Thread 3
* TEXT > accept
* - dispatch - function.read(stream)
* thread.new(stream) stream.read()
* exit method
* CONT > accept stream.read()
* CONT > accept stream.read()
* CONT=fin > accept stream.read()
* EOF stream.read EOF
* RESUME(NEXT MSG)
* </pre>
*
* @param <T> the type of object to give to user function
* @param <R> the type of object that user function will return
*/
public abstract class DispatchedMessageSink<T, R> implements MessageSink
{
private final Executor executor;
private final Function<T, R> function;
private CompletableFuture<Void> dispatchComplete;
private MessageSink typeSink;
public DispatchedMessageSink(Executor executor, Function<T, R> function)
{
this.executor = executor;
this.function = function;
}
public abstract MessageSink newSink(Frame frame);
public void accept(Frame frame, final FrameCallback callback)
{
if (typeSink == null)
{
typeSink = newSink(frame);
// Dispatch to end user function (will likely start with blocking for data/accept)
dispatchComplete = CompletableFuture.supplyAsync(() ->
{
final T dispatchedType = (T) typeSink;
function.apply(dispatchedType);
return null;
}, executor);
}
final FrameCallback frameCallback;
if (frame.isFin())
{
CompletableFuture<Void> finComplete = new CompletableFuture<>();
frameCallback = new FrameCallback()
{
@Override
public void fail(Throwable cause)
{
finComplete.completeExceptionally(cause);
}
@Override
public void succeed()
{
finComplete.complete(null);
}
};
CompletableFuture.allOf(dispatchComplete, finComplete).whenComplete(
(aVoid, throwable) ->
{
typeSink = null;
dispatchComplete = null;
if (throwable != null)
callback.fail(throwable);
else
callback.succeed();
});
}
else
{
// Non-fin-frame
frameCallback = callback;
}
typeSink.accept(frame, frameCallback);
}
}

View File

@ -19,87 +19,21 @@
package org.eclipse.jetty.websocket.common.message;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
public class InputStreamMessageSink implements MessageSink
public class InputStreamMessageSink extends DispatchedMessageSink<InputStream, Void>
{
private static final Logger LOG = Log.getLogger(ReaderMessageSink.class);
private final Function<InputStream, Void> onStreamFunction;
private final Executor executor;
private MessageInputStream stream;
private CountDownLatch dispatchCompleted = new CountDownLatch(1);
public InputStreamMessageSink(Executor executor, Function<InputStream, Void> function)
{
this.executor = executor;
this.onStreamFunction = function;
super(executor, function);
}
@Override
public void accept(Frame frame, FrameCallback callback)
public MessageSink newSink(Frame frame)
{
try
{
boolean first = false;
if (stream == null)
{
stream = new MessageInputStream();
first = true;
}
stream.accept(frame, callback);
if (first)
{
dispatchCompleted = new CountDownLatch(1);
executor.execute(() -> {
final MessageInputStream dispatchedStream = stream;
try
{
onStreamFunction.apply(dispatchedStream);
}
catch (Throwable t)
{
// processing of errors is the responsibility
// of the stream function
if (LOG.isDebugEnabled())
{
LOG.debug("Unhandled throwable", t);
}
}
// Returned from dispatch, stream should be closed
IO.close(dispatchedStream);
dispatchCompleted.countDown();
});
}
}
finally
{
//noinspection Duplicates
if (frame.isFin())
{
if (LOG.isDebugEnabled())
LOG.debug("dispatch complete await() - {}", stream);
try
{
// TODO: remove
dispatchCompleted.await();
}
catch (InterruptedException e)
{
if (LOG.isDebugEnabled())
LOG.debug(e);
}
stream = null;
}
}
return new MessageInputStream();
}
}

View File

@ -82,7 +82,7 @@ public class MessageInputStream extends InputStream implements MessageSink
@Override
public void close() throws IOException
{
if(LOG.isDebugEnabled())
if (LOG.isDebugEnabled())
LOG.debug("close()");
if (closed.compareAndSet(false, true))
@ -98,7 +98,7 @@ public class MessageInputStream extends InputStream implements MessageSink
public FrameCallbackBuffer getActiveFrame() throws InterruptedIOException
{
if(activeFrame == null)
if (activeFrame == null)
{
// sync and poll queue
FrameCallbackBuffer result;
@ -126,12 +126,20 @@ public class MessageInputStream extends InputStream implements MessageSink
private void shutdown()
{
if(LOG.isDebugEnabled())
if (LOG.isDebugEnabled())
LOG.debug("shutdown()");
synchronized (buffers)
{
closed.set(true);
Throwable cause = new IOException("Shutdown");
for (FrameCallbackBuffer buffer : buffers)
{
buffer.callback.fail(cause);
}
// Removed buffers that may have remained in the queue.
buffers.clear();
}
}
@Override
public void mark(int readlimit)

View File

@ -19,89 +19,21 @@
package org.eclipse.jetty.websocket.common.message;
import java.io.Reader;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
public class ReaderMessageSink implements MessageSink
public class ReaderMessageSink extends DispatchedMessageSink<Reader,Void>
{
private static final Logger LOG = Log.getLogger(ReaderMessageSink.class);
private final Function<Reader, Void> onStreamFunction;
private final Executor executor;
private MessageReader stream;
private CountDownLatch dispatchCompleted = new CountDownLatch(1);
public ReaderMessageSink(Executor executor, Function<Reader, Void> function)
{
this.executor = executor;
this.onStreamFunction = function;
super(executor, function);
}
@Override
public void accept(Frame frame, FrameCallback callback)
public MessageReader newSink(Frame frame)
{
try
{
boolean first = false;
if (stream == null)
{
stream = new MessageReader(new MessageInputStream());
first = true;
}
stream.accept(frame, callback);
if (first)
{
dispatchCompleted = new CountDownLatch(1);
executor.execute(() ->
{
final MessageReader dispatchedStream = stream;
try
{
onStreamFunction.apply(dispatchedStream);
}
catch (Throwable t)
{
// processing of errors is the responsibility
// of the stream function
if (LOG.isDebugEnabled())
{
LOG.debug("Unhandled throwable", t);
}
}
if (LOG.isDebugEnabled())
LOG.debug("return from dispatch - {}", stream);
// Returned from dispatch, stream should be closed
IO.close(dispatchedStream);
dispatchCompleted.countDown();
});
}
}
finally
{
//noinspection Duplicates
if (frame.isFin())
{
if (LOG.isDebugEnabled())
LOG.debug("fin/dispatch complete await() - {}", stream);
try
{
dispatchCompleted.await();
}
catch (InterruptedException e)
{
if (LOG.isDebugEnabled())
LOG.debug(e);
}
stream = null;
}
}
return new MessageReader(new MessageInputStream());
}
}

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@Deprecated
public class Primitives
{
private static final Map<Class<?>, Class<?>> PRIMITIVE_CLASS_MAP;