430242 - added SharedBlockingCallback to support threadsafe blocking

This commit is contained in:
Greg Wilkins 2014-03-13 14:46:00 +11:00
parent af7dd4b99e
commit 4de73d508b
11 changed files with 412 additions and 78 deletions

View File

@ -728,7 +728,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
{
BlockingCallback writeBlock = _response.getHttpOutput().getWriteBlockingCallback();
BlockingCallback writeBlock = _response.getHttpOutput().acquireWriteBlockingCallback();
boolean committing=sendResponse(info,content,complete,writeBlock);
writeBlock.block();
return committing;

View File

@ -21,16 +21,16 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback
{
private static final Logger LOG = Log.getLogger(HttpInputOverHTTP.class);
private final BlockingCallback _readBlocker = new BlockingCallback();
private final SharedBlockingCallback _readBlocker = new SharedBlockingCallback();
private final HttpConnection _httpConnection;
private ByteBuffer _content;
@ -57,6 +57,7 @@ public class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback
{
while(true)
{
_readBlocker.acquire();
_httpConnection.fillInterested(_readBlocker);
LOG.debug("{} block readable on {}",this,_readBlocker);
_readBlocker.block();

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritePendingException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
@ -33,6 +34,7 @@ import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
@ -54,7 +56,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel<?> _channel;
private final BlockingCallback _writeblock=new BlockingCallback();
private final SharedBlockingCallback _writeblock=new SharedBlockingCallback();
private long _written;
private ByteBuffer _aggregate;
private int _bufferSize;
@ -114,13 +116,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return _channel.getResponse().isAllContentWritten(_written);
}
protected BlockingCallback getWriteBlockingCallback()
protected BlockingCallback acquireWriteBlockingCallback() throws IOException
{
_writeblock.acquire();
return _writeblock;
}
protected void write(ByteBuffer content, boolean complete) throws IOException
{
_writeblock.acquire();
write(content,complete,_writeblock);
_writeblock.block();
}
@ -435,6 +439,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Check if all written or full
if (complete || BufferUtil.isFull(_aggregate))
{
_writeblock.acquire();
write(_aggregate, complete, _writeblock);
_writeblock.block();
if (complete)
@ -492,6 +497,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(ByteBuffer content) throws IOException
{
_writeblock.acquire();
write(content,true,_writeblock);
_writeblock.block();
}
@ -503,6 +509,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(InputStream in) throws IOException
{
_writeblock.acquire();
new InputStreamWritingCB(in,_writeblock).iterate();
_writeblock.block();
}
@ -514,6 +521,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(ReadableByteChannel in) throws IOException
{
_writeblock.acquire();
new ReadableByteChannelWritingCB(in,_writeblock).iterate();
_writeblock.block();
}
@ -526,6 +534,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(HttpContent content) throws IOException
{
_writeblock.acquire();
sendContent(content,_writeblock);
_writeblock.block();
}

View File

@ -21,8 +21,8 @@ package org.eclipse.jetty.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */
/**
@ -53,10 +53,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class BlockingCallback implements Callback
{
private static Throwable COMPLETED=new Throwable();
private final AtomicBoolean _done=new AtomicBoolean(false);
private final Semaphore _semaphore = new Semaphore(0);
private Throwable _cause;
private static Throwable SUCCEEDED=new Throwable()
{
@Override
public String toString() { return "SUCCEEDED"; }
};
private final CountDownLatch _latch = new CountDownLatch(1);
private final AtomicReference<Throwable> _state = new AtomicReference<>();
public BlockingCallback()
{}
@ -64,21 +68,15 @@ public class BlockingCallback implements Callback
@Override
public void succeeded()
{
if (_done.compareAndSet(false,true))
{
_cause=COMPLETED;
_semaphore.release();
}
if (_state.compareAndSet(null,SUCCEEDED))
_latch.countDown();
}
@Override
public void failed(Throwable cause)
{
if (_done.compareAndSet(false,true))
{
_cause=cause;
_semaphore.release();
}
if (_state.compareAndSet(null,cause))
_latch.countDown();
}
/** Block until the Callback has succeeded or failed and
@ -91,14 +89,15 @@ public class BlockingCallback implements Callback
{
try
{
_semaphore.acquire();
if (_cause==COMPLETED)
_latch.await();
Throwable state=_state.get();
if (state==SUCCEEDED)
return;
if (_cause instanceof IOException)
throw (IOException) _cause;
if (_cause instanceof CancellationException)
throw (CancellationException) _cause;
throw new IOException(_cause);
if (state instanceof IOException)
throw (IOException) state;
if (state instanceof CancellationException)
throw (CancellationException) state;
throw new IOException(state);
}
catch (final InterruptedException e)
{
@ -106,8 +105,7 @@ public class BlockingCallback implements Callback
}
finally
{
_done.set(false);
_cause=null;
_state.set(null);
}
}
@ -115,7 +113,7 @@ public class BlockingCallback implements Callback
@Override
public String toString()
{
return String.format("%s@%x{%b,%b}",BlockingCallback.class.getSimpleName(),hashCode(),_done.get(),_cause==COMPLETED);
return String.format("%s@%x{%s}",BlockingCallback.class.getSimpleName(),hashCode(),_state.get());
}
}

View File

@ -0,0 +1,191 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/* ------------------------------------------------------------ */
/**
* A Callback for simple reusable conversion of an
* asynchronous API to blocking.
* <p>
* To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from
* interfering with later reuses of this class, the callback context is used to hold pass a phase indicated
* and only a single callback per phase is allowed.
* <p>
* A typical usage pattern is:
* <pre>
* public class MyClass
* {
* BlockingCallback cb = new BlockingCallback();
*
* public void blockingMethod(Object args) throws Exception
* {
* asyncMethod(args,cb);
* cb.block();
* }
*
* public <C>void asyncMethod(Object args, Callback callback)
* {
* ...
* }
* }
*/
public class SharedBlockingCallback extends BlockingCallback
{
private static Throwable IDLE=new Throwable()
{
@Override
public String toString() { return "IDLE"; }
};
private static Throwable SUCCEEDED=new Throwable()
{
@Override
public String toString() { return "SUCCEEDED"; }
};
final ReentrantLock _lock = new ReentrantLock();
Condition _idle = _lock.newCondition();
Condition _complete = _lock.newCondition();
Throwable _state = IDLE;
public SharedBlockingCallback()
{}
public void acquire() throws IOException
{
_lock.lock();
try
{
while (_state!=IDLE)
_idle.await();
_state=null;
}
catch (final InterruptedException e)
{
throw new InterruptedIOException(){{initCause(e);}};
}
finally
{
_lock.unlock();
}
}
@Override
public void succeeded()
{
_lock.lock();
try
{
if (_state==null)
{
_state=SUCCEEDED;
_complete.signalAll();
}
else if (_state==IDLE)
throw new IllegalStateException("IDLE");
}
finally
{
_lock.unlock();
}
}
@Override
public void failed(Throwable cause)
{
_lock.lock();
try
{
if (_state==null)
{
_state=cause;
_complete.signalAll();
}
else if (_state==IDLE)
throw new IllegalStateException("IDLE");
}
finally
{
_lock.unlock();
}
}
/** Block until the Callback has succeeded or failed and
* after the return leave in the state to allow reuse.
* This is useful for code that wants to repeatable use a FutureCallback to convert
* an asynchronous API to a blocking API.
* @throws IOException if exception was caught during blocking, or callback was cancelled
*/
@Override
public void block() throws IOException
{
_lock.lock();
try
{
while (_state==null)
_complete.await();
if (_state==SUCCEEDED)
return;
if (_state==IDLE)
throw new IllegalStateException("IDLE");
if (_state instanceof IOException)
throw (IOException) _state;
if (_state instanceof CancellationException)
throw (CancellationException) _state;
throw new IOException(_state);
}
catch (final InterruptedException e)
{
throw new InterruptedIOException(){{initCause(e);}};
}
finally
{
_state=IDLE;
_idle.signalAll();
_lock.unlock();
}
}
@Override
public String toString()
{
_lock.lock();
try
{
return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
}
finally
{
_lock.unlock();
}
}
}

View File

@ -32,56 +32,16 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class BlockingCallbackTest
{
private final static BlockingCallback reused= new BlockingCallback();
interface Factory
public BlockingCallbackTest()
{
BlockingCallback newBlockingCallback();
}
@Parameters
public static Collection<Object[]> data()
{
List<Object[]> data = new ArrayList<>();
data.add(new Factory[] { new Factory() {
@Override
public BlockingCallback newBlockingCallback()
{
return new BlockingCallback();
}
}});
data.add(new Factory[] { new Factory() {
@Override
public BlockingCallback newBlockingCallback()
{
return reused;
}
}});
data.add(new Factory[] { new Factory() {
@Override
public BlockingCallback newBlockingCallback()
{
return reused;
}
}});
return data;
}
final private Factory _factory;
public BlockingCallbackTest(Factory factory)
{
_factory=factory;
}
@Test
public void testDone() throws Exception
{
BlockingCallback fcb= _factory.newBlockingCallback();
final BlockingCallback fcb= new BlockingCallback();
fcb.succeeded();
long start=System.currentTimeMillis();
fcb.block();
@ -91,7 +51,7 @@ public class BlockingCallbackTest
@Test
public void testGetDone() throws Exception
{
final BlockingCallback fcb= _factory.newBlockingCallback();
final BlockingCallback fcb= new BlockingCallback();
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable()
@ -115,7 +75,7 @@ public class BlockingCallbackTest
@Test
public void testFailed() throws Exception
{
BlockingCallback fcb= _factory.newBlockingCallback();
final BlockingCallback fcb= new BlockingCallback();
Exception ex=new Exception("FAILED");
fcb.failed(ex);
@ -135,7 +95,7 @@ public class BlockingCallbackTest
@Test
public void testGetFailed() throws Exception
{
final BlockingCallback fcb= _factory.newBlockingCallback();
final BlockingCallback fcb= new BlockingCallback();
final Exception ex=new Exception("FAILED");
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -0,0 +1,168 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
public class SharedBlockingCallbackTest
{
final SharedBlockingCallback fcb= new SharedBlockingCallback();
public SharedBlockingCallbackTest()
{
}
@Test
public void testDone() throws Exception
{
fcb.acquire();
fcb.succeeded();
long start=System.currentTimeMillis();
fcb.block();
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
}
@Test
public void testGetDone() throws Exception
{
fcb.acquire();
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable()
{
@Override
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
fcb.succeeded();
}
}).start();
latch.await();
long start=System.currentTimeMillis();
fcb.block();
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
}
@Test
public void testFailed() throws Exception
{
fcb.acquire();
Exception ex=new Exception("FAILED");
fcb.failed(ex);
long start=System.currentTimeMillis();
try
{
fcb.block();
Assert.fail();
}
catch(IOException ee)
{
Assert.assertEquals(ex,ee.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
}
@Test
public void testGetFailed() throws Exception
{
fcb.acquire();
final Exception ex=new Exception("FAILED");
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable()
{
@Override
public void run()
{
latch.countDown();
try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
fcb.failed(ex);
}
}).start();
latch.await();
long start=System.currentTimeMillis();
try
{
fcb.block();
Assert.fail();
}
catch(IOException ee)
{
Assert.assertEquals(ex,ee.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
}
@Test
public void testAcquireBlocked() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
fcb.acquire();
latch.countDown();
TimeUnit.MILLISECONDS.sleep(100);
fcb.succeeded();
fcb.block();
}
catch(Exception e)
{
e.printStackTrace();
}
}
}).start();
latch.await();
long start=System.currentTimeMillis();
fcb.acquire();
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
fcb.succeeded();
fcb.block();
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));
}
}

View File

@ -18,11 +18,14 @@
package org.eclipse.jetty.websocket.common;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.websocket.api.WriteCallback;
public class BlockingWriteCallback extends BlockingCallback implements WriteCallback
public class BlockingWriteCallback extends SharedBlockingCallback implements WriteCallback
{
public BlockingWriteCallback()
{}
@Override
public void writeFailed(Throwable x)
{

View File

@ -100,6 +100,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private void blockingWrite(WebSocketFrame frame) throws IOException
{
blocker.acquire();
uncheckedSendFrame(frame, blocker);
blocker.block();
}
@ -445,6 +446,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
lockMsg(MsgType.ASYNC);
try
{
blocker.acquire();
uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, blocker);
blocker.block();
}

View File

@ -142,6 +142,7 @@ public class MessageOutputStream extends OutputStream
frame.setPayload(buffer);
frame.setFin(fin);
blocker.acquire();
outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
blocker.block();

View File

@ -146,6 +146,7 @@ public class MessageWriter extends Writer
frame.setPayload(data);
frame.setFin(fin);
blocker.acquire();
outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
blocker.block();