435322 Added a idleTimeout to the SharedBlockerCallback

This commit is contained in:
Greg Wilkins 2014-09-03 10:46:18 +10:00
parent 7be9f0d7a4
commit 0a2aeb54a1
2 changed files with 116 additions and 24 deletions

View File

@ -48,7 +48,7 @@ import org.eclipse.jetty.util.thread.NonBlockingThread;
*/
public class SharedBlockingCallback
{
private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
final ReentrantLock _lock = new ReentrantLock();
final Condition _idle = _lock.newCondition();
@ -128,6 +128,12 @@ public class SharedBlockingCallback
return _blocker;
}
protected void notComplete(Blocker blocker)
{
LOG.warn("Blocker not complete {}",blocker);
if (LOG.isDebugEnabled())
LOG.debug(new Throwable());
}
/* ------------------------------------------------------------ */
/** A Closeable Callback.
@ -152,8 +158,8 @@ public class SharedBlockingCallback
_state = SUCCEEDED;
_complete.signalAll();
}
else if (_state == IDLE)
throw new IllegalStateException("IDLE");
else
throw new IllegalStateException(_state);
}
finally
{
@ -169,11 +175,17 @@ public class SharedBlockingCallback
{
if (_state == null)
{
_state = cause==null?FAILED:cause;
if (cause==null)
_state=FAILED;
else if (cause instanceof BlockerTimeoutException)
// Not this blockers timeout
_state=new IOException(cause);
else
_state=cause;
_complete.signalAll();
}
else if (_state == IDLE)
throw new IllegalStateException("IDLE",cause);
else
throw new IllegalStateException(_state);
}
finally
{
@ -202,15 +214,9 @@ public class SharedBlockingCallback
if (idle>0)
{
if (!_complete.await(idle,TimeUnit.MILLISECONDS))
{
// The callback has not arrived in sufficient time.
// We will synthesize a TimeoutException and then
// create a new Blocker, so that any late arriving callback
// does not cause a problem with the next cycle.
_state=new TimeoutException("No Blocker CB");
LOG.warn(_state);
_blocker=new Blocker();
}
// We will synthesize a TimeoutException
_state=new BlockerTimeoutException();
}
else
_complete.await();
@ -260,17 +266,19 @@ public class SharedBlockingCallback
if (_state == IDLE)
throw new IllegalStateException("IDLE");
if (_state == null)
{
LOG.warn("Blocker not complete {}",this);
if (LOG.isDebugEnabled())
LOG.debug(new Throwable());
}
notComplete(this);
}
finally
{
try
{
_state = IDLE;
// If the blocker timed itself out, remember the state
if (_state instanceof BlockerTimeoutException)
// and create a new Blocker
_blocker=new Blocker();
else
// else reuse Blocker
_state = IDLE;
_idle.signalAll();
_complete.signalAll();
}
@ -295,4 +303,8 @@ public class SharedBlockingCallback
}
}
}
private class BlockerTimeoutException extends TimeoutException
{
}
}

View File

@ -18,9 +18,17 @@
package org.eclipse.jetty.util;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.hamcrest.Matchers;
@ -29,7 +37,23 @@ import org.junit.Test;
public class SharedBlockingCallbackTest
{
final SharedBlockingCallback sbcb= new SharedBlockingCallback();
final AtomicInteger notComplete = new AtomicInteger();
final SharedBlockingCallback sbcb= new SharedBlockingCallback()
{
@Override
protected long getIdleTimeout()
{
return 150;
}
@Override
protected void notComplete(Blocker blocker)
{
super.notComplete(blocker);
notComplete.incrementAndGet();
}
};
public SharedBlockingCallbackTest()
{
@ -46,7 +70,8 @@ public class SharedBlockingCallbackTest
start=System.currentTimeMillis();
blocker.block();
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
Assert.assertEquals(0,notComplete.get());
}
@Test
@ -74,6 +99,7 @@ public class SharedBlockingCallbackTest
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
Assert.assertEquals(0,notComplete.get());
}
@Test
@ -95,7 +121,8 @@ public class SharedBlockingCallbackTest
start=System.currentTimeMillis();
Assert.assertEquals(ex,ee.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
Assert.assertEquals(0,notComplete.get());
}
@Test
@ -133,6 +160,7 @@ public class SharedBlockingCallbackTest
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
Assert.assertEquals(0,notComplete.get());
}
@ -174,6 +202,58 @@ public class SharedBlockingCallbackTest
blocker.succeeded();
blocker.block();
};
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));
Assert.assertEquals(0,notComplete.get());
}
@Test
public void testBlockerClose() throws Exception
{
try (Blocker blocker=sbcb.acquire())
{
SharedBlockingCallback.LOG.info("Blocker not complete "+blocker+" warning is expected...");
}
Assert.assertEquals(1,notComplete.get());
}
@Test
public void testBlockerTimeout() throws Exception
{
Blocker b0=null;
try
{
try (Blocker blocker=sbcb.acquire())
{
b0=blocker;
Thread.sleep(400);
blocker.block();
}
fail();
}
catch(IOException e)
{
Throwable cause = e.getCause();
assertThat(cause,instanceOf(TimeoutException.class));
}
Assert.assertEquals(0,notComplete.get());
try (Blocker blocker=sbcb.acquire())
{
assertThat(blocker,not(equalTo(b0)));
try
{
b0.succeeded();
fail();
}
catch(Exception e)
{
assertThat(e,instanceOf(IllegalStateException.class));
assertThat(e.getCause(),instanceOf(TimeoutException.class));
}
blocker.succeeded();
}
}
}