Revert "430242 - added SharedBlockingCallback to support threadsafe blocking"
This reverts commit 9c30e7a303
.
This commit is contained in:
parent
bbfb5c7237
commit
138dfba560
|
@ -53,7 +53,6 @@ import org.eclipse.jetty.util.BlockingCallback;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.URIUtil;
|
import org.eclipse.jetty.util.URIUtil;
|
||||||
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
|
|
||||||
import org.eclipse.jetty.util.log.Log;
|
import org.eclipse.jetty.util.log.Log;
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
@ -729,10 +728,10 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
|
||||||
|
|
||||||
protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
|
protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
|
||||||
{
|
{
|
||||||
try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback())
|
BlockingCallback writeBlock = _response.getHttpOutput().acquireWriteBlockingCallback();
|
||||||
{
|
boolean committing=sendResponse(info,content,complete,writeBlock);
|
||||||
return sendResponse(info,content,complete,blocker);
|
writeBlock.block();
|
||||||
}
|
return committing;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isCommitted()
|
public boolean isCommitted()
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.SharedBlockingCallback;
|
import org.eclipse.jetty.util.SharedBlockingCallback;
|
||||||
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
|
|
||||||
import org.eclipse.jetty.util.log.Log;
|
import org.eclipse.jetty.util.log.Log;
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
|
|
||||||
|
@ -58,11 +57,10 @@ public class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback
|
||||||
{
|
{
|
||||||
while(true)
|
while(true)
|
||||||
{
|
{
|
||||||
try (Blocker blocker=_readBlocker.acquire())
|
_readBlocker.acquire();
|
||||||
{
|
_httpConnection.fillInterested(_readBlocker);
|
||||||
_httpConnection.fillInterested(blocker);
|
LOG.debug("{} block readable on {}",this,_readBlocker);
|
||||||
LOG.debug("{} block readable on {}",this,blocker);
|
_readBlocker.block();
|
||||||
}
|
|
||||||
|
|
||||||
Object content=getNextContent();
|
Object content=getNextContent();
|
||||||
if (content!=null || isFinished())
|
if (content!=null || isFinished())
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.eclipse.jetty.http.HttpContent;
|
||||||
import org.eclipse.jetty.io.EofException;
|
import org.eclipse.jetty.io.EofException;
|
||||||
import org.eclipse.jetty.util.BlockingCallback;
|
import org.eclipse.jetty.util.BlockingCallback;
|
||||||
import org.eclipse.jetty.util.SharedBlockingCallback;
|
import org.eclipse.jetty.util.SharedBlockingCallback;
|
||||||
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
|
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.IteratingCallback;
|
import org.eclipse.jetty.util.IteratingCallback;
|
||||||
|
@ -117,17 +116,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
||||||
return _channel.getResponse().isAllContentWritten(_written);
|
return _channel.getResponse().isAllContentWritten(_written);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Blocker acquireWriteBlockingCallback() throws IOException
|
protected BlockingCallback acquireWriteBlockingCallback() throws IOException
|
||||||
{
|
{
|
||||||
return _writeblock.acquire();
|
_writeblock.acquire();
|
||||||
|
return _writeblock;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void write(ByteBuffer content, boolean complete) throws IOException
|
protected void write(ByteBuffer content, boolean complete) throws IOException
|
||||||
{
|
{
|
||||||
try (Blocker blocker=_writeblock.acquire())
|
_writeblock.acquire();
|
||||||
{
|
write(content,complete,_writeblock);
|
||||||
write(content,complete,blocker);
|
_writeblock.block();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void write(ByteBuffer content, boolean complete, Callback callback)
|
protected void write(ByteBuffer content, boolean complete, Callback callback)
|
||||||
|
@ -440,10 +439,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
||||||
// Check if all written or full
|
// Check if all written or full
|
||||||
if (complete || BufferUtil.isFull(_aggregate))
|
if (complete || BufferUtil.isFull(_aggregate))
|
||||||
{
|
{
|
||||||
try(Blocker blocker=_writeblock.acquire())
|
_writeblock.acquire();
|
||||||
{
|
write(_aggregate, complete, _writeblock);
|
||||||
write(_aggregate, complete, blocker);
|
_writeblock.block();
|
||||||
}
|
|
||||||
if (complete)
|
if (complete)
|
||||||
closed();
|
closed();
|
||||||
}
|
}
|
||||||
|
@ -499,10 +497,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
||||||
*/
|
*/
|
||||||
public void sendContent(ByteBuffer content) throws IOException
|
public void sendContent(ByteBuffer content) throws IOException
|
||||||
{
|
{
|
||||||
try(Blocker blocker=_writeblock.acquire())
|
_writeblock.acquire();
|
||||||
{
|
write(content,true,_writeblock);
|
||||||
write(content,true,blocker);
|
_writeblock.block();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
/* ------------------------------------------------------------ */
|
||||||
|
@ -512,10 +509,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
||||||
*/
|
*/
|
||||||
public void sendContent(InputStream in) throws IOException
|
public void sendContent(InputStream in) throws IOException
|
||||||
{
|
{
|
||||||
try(Blocker blocker=_writeblock.acquire())
|
_writeblock.acquire();
|
||||||
{
|
new InputStreamWritingCB(in,_writeblock).iterate();
|
||||||
new InputStreamWritingCB(in,blocker).iterate();
|
_writeblock.block();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
/* ------------------------------------------------------------ */
|
||||||
|
@ -525,10 +521,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
||||||
*/
|
*/
|
||||||
public void sendContent(ReadableByteChannel in) throws IOException
|
public void sendContent(ReadableByteChannel in) throws IOException
|
||||||
{
|
{
|
||||||
try(Blocker blocker=_writeblock.acquire())
|
_writeblock.acquire();
|
||||||
{
|
new ReadableByteChannelWritingCB(in,_writeblock).iterate();
|
||||||
new ReadableByteChannelWritingCB(in,blocker).iterate();
|
_writeblock.block();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -539,10 +534,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
||||||
*/
|
*/
|
||||||
public void sendContent(HttpContent content) throws IOException
|
public void sendContent(HttpContent content) throws IOException
|
||||||
{
|
{
|
||||||
try(Blocker blocker=_writeblock.acquire())
|
_writeblock.acquire();
|
||||||
{
|
sendContent(content,_writeblock);
|
||||||
sendContent(content,blocker);
|
_writeblock.block();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
/* ------------------------------------------------------------ */
|
||||||
|
|
|
@ -26,7 +26,30 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
/* ------------------------------------------------------------ */
|
||||||
/**
|
/**
|
||||||
* TODO
|
* A Callback for simple reusable conversion of an
|
||||||
|
* asynchronous API to blocking.
|
||||||
|
* <p>
|
||||||
|
* To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from
|
||||||
|
* interfering with later reuses of this class, the callback context is used to hold pass a phase indicated
|
||||||
|
* and only a single callback per phase is allowed.
|
||||||
|
* <p>
|
||||||
|
* A typical usage pattern is:
|
||||||
|
* <pre>
|
||||||
|
* public class MyClass
|
||||||
|
* {
|
||||||
|
* BlockingCallback cb = new BlockingCallback();
|
||||||
|
*
|
||||||
|
* public void blockingMethod(Object args) throws Exception
|
||||||
|
* {
|
||||||
|
* asyncMethod(args,cb);
|
||||||
|
* cb.block();
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* public <C>void asyncMethod(Object args, Callback callback)
|
||||||
|
* {
|
||||||
|
* ...
|
||||||
|
* }
|
||||||
|
* }
|
||||||
*/
|
*/
|
||||||
public class BlockingCallback implements Callback
|
public class BlockingCallback implements Callback
|
||||||
{
|
{
|
||||||
|
|
|
@ -18,203 +18,174 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.util;
|
package org.eclipse.jetty.util;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
/* ------------------------------------------------------------ */
|
||||||
/** Provides a reusable BlockingCallback.
|
/**
|
||||||
|
* A Callback for simple reusable conversion of an
|
||||||
|
* asynchronous API to blocking.
|
||||||
|
* <p>
|
||||||
|
* To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from
|
||||||
|
* interfering with later reuses of this class, the callback context is used to hold pass a phase indicated
|
||||||
|
* and only a single callback per phase is allowed.
|
||||||
|
* <p>
|
||||||
* A typical usage pattern is:
|
* A typical usage pattern is:
|
||||||
* <pre>
|
* <pre>
|
||||||
* void someBlockingCall(Object... args) throws IOException
|
* public class MyClass
|
||||||
* {
|
* {
|
||||||
* try(Blocker blocker=sharedBlockingCallback.acquire())
|
* BlockingCallback cb = new BlockingCallback();
|
||||||
* {
|
*
|
||||||
* someAsyncCall(args,blocker);
|
* public void blockingMethod(Object args) throws Exception
|
||||||
* }
|
* {
|
||||||
* catch(Throwable e)
|
* asyncMethod(args,cb);
|
||||||
* {
|
* cb.block();
|
||||||
* blocker.fail(e);
|
* }
|
||||||
* }
|
*
|
||||||
* }
|
* public <C>void asyncMethod(Object args, Callback callback)
|
||||||
* </pre>
|
* {
|
||||||
|
* ...
|
||||||
|
* }
|
||||||
|
* }
|
||||||
*/
|
*/
|
||||||
public class SharedBlockingCallback
|
public class SharedBlockingCallback extends BlockingCallback
|
||||||
{
|
{
|
||||||
private static Throwable IDLE = new Throwable()
|
private static Throwable IDLE=new Throwable()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString() { return "IDLE"; }
|
||||||
{
|
|
||||||
return "IDLE";
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
private static Throwable SUCCEEDED = new Throwable()
|
private static Throwable SUCCEEDED=new Throwable()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString() { return "SUCCEEDED"; }
|
||||||
{
|
|
||||||
return "SUCCEEDED";
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
final Blocker _blocker;
|
|
||||||
|
final ReentrantLock _lock = new ReentrantLock();
|
||||||
|
Condition _idle = _lock.newCondition();
|
||||||
|
Condition _complete = _lock.newCondition();
|
||||||
|
Throwable _state = IDLE;
|
||||||
|
|
||||||
|
|
||||||
public SharedBlockingCallback()
|
public SharedBlockingCallback()
|
||||||
{
|
{}
|
||||||
this(new Blocker());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected SharedBlockingCallback(Blocker blocker)
|
public void acquire() throws IOException
|
||||||
{
|
{
|
||||||
_blocker=blocker;
|
_lock.lock();
|
||||||
}
|
|
||||||
|
|
||||||
public Blocker acquire() throws IOException
|
|
||||||
{
|
|
||||||
_blocker._lock.lock();
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
while (_blocker._state != IDLE)
|
while (_state!=IDLE)
|
||||||
_blocker._idle.await();
|
_idle.await();
|
||||||
_blocker._state = null;
|
_state=null;
|
||||||
}
|
}
|
||||||
catch (final InterruptedException e)
|
catch (final InterruptedException e)
|
||||||
{
|
{
|
||||||
throw new InterruptedIOException()
|
throw new InterruptedIOException(){{initCause(e);}};
|
||||||
{
|
|
||||||
{
|
|
||||||
initCause(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
_blocker._lock.unlock();
|
_lock.unlock();
|
||||||
}
|
}
|
||||||
return _blocker;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
/* ------------------------------------------------------------ */
|
public void succeeded()
|
||||||
/** A Closeable Callback.
|
|
||||||
* Uses the auto close mechanism to block until the collback is complete.
|
|
||||||
*/
|
|
||||||
public static class Blocker implements Callback, Closeable
|
|
||||||
{
|
{
|
||||||
final ReentrantLock _lock = new ReentrantLock();
|
_lock.lock();
|
||||||
final Condition _idle = _lock.newCondition();
|
try
|
||||||
final Condition _complete = _lock.newCondition();
|
|
||||||
Throwable _state = IDLE;
|
|
||||||
|
|
||||||
public Blocker()
|
|
||||||
{
|
{
|
||||||
|
if (_state==null)
|
||||||
|
{
|
||||||
|
_state=SUCCEEDED;
|
||||||
|
_complete.signalAll();
|
||||||
|
}
|
||||||
|
else if (_state==IDLE)
|
||||||
|
throw new IllegalStateException("IDLE");
|
||||||
}
|
}
|
||||||
|
finally
|
||||||
@Override
|
|
||||||
public void succeeded()
|
|
||||||
{
|
{
|
||||||
_lock.lock();
|
_lock.unlock();
|
||||||
try
|
|
||||||
{
|
|
||||||
if (_state == null)
|
|
||||||
{
|
|
||||||
_state = SUCCEEDED;
|
|
||||||
_complete.signalAll();
|
|
||||||
}
|
|
||||||
else if (_state == IDLE)
|
|
||||||
throw new IllegalStateException("IDLE");
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void failed(Throwable cause)
|
|
||||||
{
|
|
||||||
_lock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (_state == null)
|
|
||||||
{
|
|
||||||
_state = cause;
|
|
||||||
_complete.signalAll();
|
|
||||||
}
|
|
||||||
else if (_state == IDLE)
|
|
||||||
throw new IllegalStateException("IDLE");
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Block until the Callback has succeeded or failed and after the return leave in the state to allow reuse. This is useful for code that wants to
|
|
||||||
* repeatable use a FutureCallback to convert an asynchronous API to a blocking API.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
* if exception was caught during blocking, or callback was cancelled
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException
|
|
||||||
{
|
|
||||||
_lock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
while (_state == null)
|
|
||||||
_complete.await();
|
|
||||||
|
|
||||||
if (_state == SUCCEEDED)
|
|
||||||
return;
|
|
||||||
if (_state == IDLE)
|
|
||||||
throw new IllegalStateException("IDLE");
|
|
||||||
if (_state instanceof IOException)
|
|
||||||
throw (IOException)_state;
|
|
||||||
if (_state instanceof CancellationException)
|
|
||||||
throw (CancellationException)_state;
|
|
||||||
if (_state instanceof RuntimeException)
|
|
||||||
throw (RuntimeException)_state;
|
|
||||||
if (_state instanceof Error)
|
|
||||||
throw (Error)_state;
|
|
||||||
throw new IOException(_state);
|
|
||||||
}
|
|
||||||
catch (final InterruptedException e)
|
|
||||||
{
|
|
||||||
throw new InterruptedIOException()
|
|
||||||
{
|
|
||||||
{
|
|
||||||
initCause(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_state = IDLE;
|
|
||||||
_idle.signalAll();
|
|
||||||
_lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString()
|
|
||||||
{
|
|
||||||
_lock.lock();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failed(Throwable cause)
|
||||||
|
{
|
||||||
|
_lock.lock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (_state==null)
|
||||||
|
{
|
||||||
|
_state=cause;
|
||||||
|
_complete.signalAll();
|
||||||
|
}
|
||||||
|
else if (_state==IDLE)
|
||||||
|
throw new IllegalStateException("IDLE");
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Block until the Callback has succeeded or failed and
|
||||||
|
* after the return leave in the state to allow reuse.
|
||||||
|
* This is useful for code that wants to repeatable use a FutureCallback to convert
|
||||||
|
* an asynchronous API to a blocking API.
|
||||||
|
* @throws IOException if exception was caught during blocking, or callback was cancelled
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void block() throws IOException
|
||||||
|
{
|
||||||
|
_lock.lock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
while (_state==null)
|
||||||
|
_complete.await();
|
||||||
|
|
||||||
|
if (_state==SUCCEEDED)
|
||||||
|
return;
|
||||||
|
if (_state==IDLE)
|
||||||
|
throw new IllegalStateException("IDLE");
|
||||||
|
if (_state instanceof IOException)
|
||||||
|
throw (IOException) _state;
|
||||||
|
if (_state instanceof CancellationException)
|
||||||
|
throw (CancellationException) _state;
|
||||||
|
throw new IOException(_state);
|
||||||
|
}
|
||||||
|
catch (final InterruptedException e)
|
||||||
|
{
|
||||||
|
throw new InterruptedIOException(){{initCause(e);}};
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_state=IDLE;
|
||||||
|
_idle.signalAll();
|
||||||
|
_lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
_lock.lock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,17 +19,22 @@
|
||||||
package org.eclipse.jetty.util;
|
package org.eclipse.jetty.util;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
|
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
public class SharedBlockingCallbackTest
|
public class SharedBlockingCallbackTest
|
||||||
{
|
{
|
||||||
final SharedBlockingCallback sbcb= new SharedBlockingCallback();
|
final SharedBlockingCallback fcb= new SharedBlockingCallback();
|
||||||
|
|
||||||
public SharedBlockingCallbackTest()
|
public SharedBlockingCallbackTest()
|
||||||
{
|
{
|
||||||
|
@ -39,37 +44,33 @@ public class SharedBlockingCallbackTest
|
||||||
@Test
|
@Test
|
||||||
public void testDone() throws Exception
|
public void testDone() throws Exception
|
||||||
{
|
{
|
||||||
long start;
|
fcb.acquire();
|
||||||
try (Blocker blocker=sbcb.acquire())
|
fcb.succeeded();
|
||||||
{
|
long start=System.currentTimeMillis();
|
||||||
blocker.succeeded();
|
fcb.block();
|
||||||
start=System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
|
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetDone() throws Exception
|
public void testGetDone() throws Exception
|
||||||
{
|
{
|
||||||
long start;
|
fcb.acquire();
|
||||||
try (final Blocker blocker=sbcb.acquire())
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
new Thread(new Runnable()
|
||||||
{
|
{
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
@Override
|
||||||
|
public void run()
|
||||||
new Thread(new Runnable()
|
|
||||||
{
|
{
|
||||||
@Override
|
latch.countDown();
|
||||||
public void run()
|
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
|
||||||
{
|
fcb.succeeded();
|
||||||
latch.countDown();
|
}
|
||||||
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
|
}).start();
|
||||||
blocker.succeeded();
|
|
||||||
}
|
|
||||||
}).start();
|
|
||||||
|
|
||||||
latch.await();
|
latch.await();
|
||||||
start=System.currentTimeMillis();
|
long start=System.currentTimeMillis();
|
||||||
}
|
fcb.block();
|
||||||
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
|
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
|
||||||
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
|
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
|
||||||
}
|
}
|
||||||
|
@ -77,19 +78,18 @@ public class SharedBlockingCallbackTest
|
||||||
@Test
|
@Test
|
||||||
public void testFailed() throws Exception
|
public void testFailed() throws Exception
|
||||||
{
|
{
|
||||||
final Exception ex = new Exception("FAILED");
|
fcb.acquire();
|
||||||
long start=Long.MIN_VALUE;
|
Exception ex=new Exception("FAILED");
|
||||||
|
fcb.failed(ex);
|
||||||
|
|
||||||
|
long start=System.currentTimeMillis();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
try (final Blocker blocker=sbcb.acquire())
|
fcb.block();
|
||||||
{
|
|
||||||
blocker.failed(ex);
|
|
||||||
}
|
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
}
|
}
|
||||||
catch(IOException ee)
|
catch(IOException ee)
|
||||||
{
|
{
|
||||||
start=System.currentTimeMillis();
|
|
||||||
Assert.assertEquals(ex,ee.getCause());
|
Assert.assertEquals(ex,ee.getCause());
|
||||||
}
|
}
|
||||||
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
|
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
|
||||||
|
@ -98,29 +98,26 @@ public class SharedBlockingCallbackTest
|
||||||
@Test
|
@Test
|
||||||
public void testGetFailed() throws Exception
|
public void testGetFailed() throws Exception
|
||||||
{
|
{
|
||||||
final Exception ex = new Exception("FAILED");
|
fcb.acquire();
|
||||||
long start=Long.MIN_VALUE;
|
final Exception ex=new Exception("FAILED");
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
new Thread(new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
|
||||||
|
fcb.failed(ex);
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
latch.await();
|
||||||
|
long start=System.currentTimeMillis();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
try (final Blocker blocker=sbcb.acquire())
|
fcb.block();
|
||||||
{
|
|
||||||
|
|
||||||
new Thread(new Runnable()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void run()
|
|
||||||
{
|
|
||||||
latch.countDown();
|
|
||||||
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
|
|
||||||
blocker.failed(ex);
|
|
||||||
}
|
|
||||||
}).start();
|
|
||||||
|
|
||||||
latch.await();
|
|
||||||
start=System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
}
|
}
|
||||||
catch(IOException ee)
|
catch(IOException ee)
|
||||||
|
@ -144,12 +141,11 @@ public class SharedBlockingCallbackTest
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
try (Blocker blocker=sbcb.acquire())
|
fcb.acquire();
|
||||||
{
|
latch.countDown();
|
||||||
latch.countDown();
|
TimeUnit.MILLISECONDS.sleep(100);
|
||||||
TimeUnit.MILLISECONDS.sleep(100);
|
fcb.succeeded();
|
||||||
blocker.succeeded();
|
fcb.block();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch(Exception e)
|
catch(Exception e)
|
||||||
{
|
{
|
||||||
|
@ -161,13 +157,12 @@ public class SharedBlockingCallbackTest
|
||||||
|
|
||||||
latch.await();
|
latch.await();
|
||||||
long start=System.currentTimeMillis();
|
long start=System.currentTimeMillis();
|
||||||
try (Blocker blocker=sbcb.acquire())
|
fcb.acquire();
|
||||||
{
|
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
|
||||||
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
|
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
|
||||||
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
|
|
||||||
|
|
||||||
blocker.succeeded();
|
fcb.succeeded();
|
||||||
};
|
fcb.block();
|
||||||
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));
|
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,39 +18,23 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.websocket.common;
|
package org.eclipse.jetty.websocket.common;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.util.SharedBlockingCallback;
|
import org.eclipse.jetty.util.SharedBlockingCallback;
|
||||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||||
|
|
||||||
|
public class BlockingWriteCallback extends SharedBlockingCallback implements WriteCallback
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
/** extend a SharedlBlockingCallback to an websocket WriteCallback
|
|
||||||
*/
|
|
||||||
public class BlockingWriteCallback extends SharedBlockingCallback
|
|
||||||
{
|
{
|
||||||
public BlockingWriteCallback()
|
public BlockingWriteCallback()
|
||||||
|
{}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeFailed(Throwable x)
|
||||||
{
|
{
|
||||||
super(new WriteBlocker());
|
failed(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
public WriteBlocker acquireWriteBlocker() throws IOException
|
@Override
|
||||||
|
public void writeSuccess()
|
||||||
{
|
{
|
||||||
return (WriteBlocker)acquire();
|
succeeded();
|
||||||
}
|
|
||||||
|
|
||||||
public static class WriteBlocker extends Blocker implements WriteCallback
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void writeFailed(Throwable x)
|
|
||||||
{
|
|
||||||
failed(x);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void writeSuccess()
|
|
||||||
{
|
|
||||||
succeeded();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.eclipse.jetty.websocket.api.BatchMode;
|
||||||
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
||||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||||
import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
|
|
||||||
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
|
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
|
||||||
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
|
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
|
||||||
import org.eclipse.jetty.websocket.common.frames.DataFrame;
|
import org.eclipse.jetty.websocket.common.frames.DataFrame;
|
||||||
|
@ -101,10 +100,9 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
||||||
|
|
||||||
private void blockingWrite(WebSocketFrame frame) throws IOException
|
private void blockingWrite(WebSocketFrame frame) throws IOException
|
||||||
{
|
{
|
||||||
try(WriteBlocker b=blocker.acquireWriteBlocker())
|
blocker.acquire();
|
||||||
{
|
uncheckedSendFrame(frame, blocker);
|
||||||
uncheckedSendFrame(frame, b);
|
blocker.block();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean lockMsg(MsgType type)
|
private boolean lockMsg(MsgType type)
|
||||||
|
@ -443,13 +441,14 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
||||||
this.batchMode = batchMode;
|
this.batchMode = batchMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void flush() throws IOException
|
public void flush() throws IOException
|
||||||
{
|
{
|
||||||
lockMsg(MsgType.ASYNC);
|
lockMsg(MsgType.ASYNC);
|
||||||
try (WriteBlocker b = blocker.acquireWriteBlocker())
|
try
|
||||||
{
|
{
|
||||||
uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b);
|
blocker.acquire();
|
||||||
|
uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, blocker);
|
||||||
|
blocker.block();
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||||
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
|
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
|
||||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||||
import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
|
|
||||||
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
|
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -143,10 +142,9 @@ public class MessageOutputStream extends OutputStream
|
||||||
frame.setPayload(buffer);
|
frame.setPayload(buffer);
|
||||||
frame.setFin(fin);
|
frame.setFin(fin);
|
||||||
|
|
||||||
try(WriteBlocker b=blocker.acquireWriteBlocker())
|
blocker.acquire();
|
||||||
{
|
outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
|
||||||
outgoing.outgoingFrame(frame, b, BatchMode.OFF);
|
blocker.block();
|
||||||
}
|
|
||||||
|
|
||||||
++frameCount;
|
++frameCount;
|
||||||
// Any flush after the first will be a CONTINUATION frame.
|
// Any flush after the first will be a CONTINUATION frame.
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||||
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
|
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
|
||||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||||
import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
|
|
||||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -147,10 +146,9 @@ public class MessageWriter extends Writer
|
||||||
frame.setPayload(data);
|
frame.setPayload(data);
|
||||||
frame.setFin(fin);
|
frame.setFin(fin);
|
||||||
|
|
||||||
try (WriteBlocker b = blocker.acquireWriteBlocker())
|
blocker.acquire();
|
||||||
{
|
outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
|
||||||
outgoing.outgoingFrame(frame, b, BatchMode.OFF);
|
blocker.block();
|
||||||
}
|
|
||||||
|
|
||||||
++frameCount;
|
++frameCount;
|
||||||
// Any flush after the first will be a CONTINUATION frame.
|
// Any flush after the first will be a CONTINUATION frame.
|
||||||
|
|
Loading…
Reference in New Issue