465606 - IteratingCallback.close() does not fail pending callback.

Fixed by failing the IteratingCallback when in state PROCESSING.
This commit is contained in:
Simone Bordet 2015-04-27 17:23:04 +02:00
parent aa8ea82b28
commit 71efdf0f89
2 changed files with 133 additions and 104 deletions

View File

@ -411,7 +411,6 @@ public abstract class IteratingCallback implements Callback
case IDLE: case IDLE:
case SUCCEEDED: case SUCCEEDED:
case FAILED: case FAILED:
case PROCESSING:
_state=State.CLOSED; _state=State.CLOSED;
break; break;

View File

@ -24,40 +24,40 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.AfterClass; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class IteratingCallbackTest public class IteratingCallbackTest
{ {
static Scheduler scheduler = new ScheduledExecutorScheduler(); private Scheduler scheduler;
@BeforeClass @Before
public static void beforeClass() throws Exception public void prepare() throws Exception
{ {
scheduler = new ScheduledExecutorScheduler();
scheduler.start(); scheduler.start();
} }
@AfterClass @After
public static void afterClass() throws Exception public void dispose() throws Exception
{ {
scheduler.stop(); scheduler.stop();
} }
@Test @Test
public void testNonWaitingProcess() throws Exception public void testNonWaitingProcess() throws Exception
{ {
TestCB cb = new TestCB()
TestCB cb=new TestCB()
{ {
int i=10; int i = 10;
@Override @Override
protected Action process() throws Exception protected Action process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-- > 1)
{ {
succeeded(); // fake a completed IO operation succeeded(); // fake a completed IO operation
return Action.SCHEDULED; return Action.SCHEDULED;
@ -65,61 +65,59 @@ public class IteratingCallbackTest
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
}; };
cb.iterate(); cb.iterate();
Assert.assertTrue(cb.waitForComplete()); Assert.assertTrue(cb.waitForComplete());
Assert.assertEquals(10,cb.processed); Assert.assertEquals(10, cb.processed);
} }
@Test @Test
public void testWaitingProcess() throws Exception public void testWaitingProcess() throws Exception
{ {
TestCB cb=new TestCB() TestCB cb = new TestCB()
{ {
int i=4; int i = 4;
@Override @Override
protected Action process() throws Exception protected Action process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-- > 1)
{ {
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); scheduler.schedule(successTask, 50, TimeUnit.MILLISECONDS);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
}; };
cb.iterate(); cb.iterate();
Assert.assertTrue(cb.waitForComplete()); Assert.assertTrue(cb.waitForComplete());
Assert.assertEquals(4,cb.processed); Assert.assertEquals(4, cb.processed);
} }
@Test @Test
public void testWaitingProcessSpuriousInterate() throws Exception public void testWaitingProcessSpuriousIterate() throws Exception
{ {
final TestCB cb=new TestCB() final TestCB cb = new TestCB()
{ {
int i=4; int i = 4;
@Override @Override
protected Action process() throws Exception protected Action process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-- > 1)
{ {
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); scheduler.schedule(successTask, 50, TimeUnit.MILLISECONDS);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
}; };
cb.iterate(); cb.iterate();
scheduler.schedule(new Runnable() scheduler.schedule(new Runnable()
{ {
@ -128,29 +126,29 @@ public class IteratingCallbackTest
{ {
cb.iterate(); cb.iterate();
if (!cb.isSucceeded()) if (!cb.isSucceeded())
scheduler.schedule(this,50,TimeUnit.MILLISECONDS); scheduler.schedule(this, 50, TimeUnit.MILLISECONDS);
} }
},49,TimeUnit.MILLISECONDS); }, 49, TimeUnit.MILLISECONDS);
Assert.assertTrue(cb.waitForComplete()); Assert.assertTrue(cb.waitForComplete());
Assert.assertEquals(4,cb.processed); Assert.assertEquals(4, cb.processed);
} }
@Test @Test
public void testNonWaitingProcessFailure() throws Exception public void testNonWaitingProcessFailure() throws Exception
{ {
TestCB cb=new TestCB() TestCB cb = new TestCB()
{ {
int i=10; int i = 10;
@Override @Override
protected Action process() throws Exception protected Action process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-- > 1)
{ {
if (i>5) if (i > 5)
succeeded(); // fake a completed IO operation succeeded(); // fake a completed IO operation
else else
failed(new Exception("testing")); failed(new Exception("testing"));
@ -159,63 +157,63 @@ public class IteratingCallbackTest
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
}; };
cb.iterate(); cb.iterate();
Assert.assertFalse(cb.waitForComplete()); Assert.assertFalse(cb.waitForComplete());
Assert.assertEquals(5,cb.processed); Assert.assertEquals(5, cb.processed);
} }
@Test @Test
public void testWaitingProcessFailure() throws Exception public void testWaitingProcessFailure() throws Exception
{ {
TestCB cb=new TestCB() TestCB cb = new TestCB()
{ {
int i=4; int i = 4;
@Override @Override
protected Action process() throws Exception protected Action process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-- > 1)
{ {
scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS); scheduler.schedule(i > 2 ? successTask : failTask, 50, TimeUnit.MILLISECONDS);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
}; };
cb.iterate(); cb.iterate();
Assert.assertFalse(cb.waitForComplete()); Assert.assertFalse(cb.waitForComplete());
Assert.assertEquals(2,cb.processed); Assert.assertEquals(2, cb.processed);
} }
@Test @Test
public void testIdleWaiting() throws Exception public void testIdleWaiting() throws Exception
{ {
final CountDownLatch idle = new CountDownLatch(1); final CountDownLatch idle = new CountDownLatch(1);
TestCB cb=new TestCB() TestCB cb = new TestCB()
{ {
int i=5; int i = 5;
@Override @Override
protected Action process() protected Action process()
{ {
processed++; processed++;
switch(i--) switch (i--)
{ {
case 5: case 5:
succeeded(); succeeded();
return Action.SCHEDULED; return Action.SCHEDULED;
case 4: case 4:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); scheduler.schedule(successTask, 5, TimeUnit.MILLISECONDS);
return Action.SCHEDULED; return Action.SCHEDULED;
case 3: case 3:
scheduler.schedule(new Runnable() scheduler.schedule(new Runnable()
{ {
@ -224,49 +222,99 @@ public class IteratingCallbackTest
{ {
idle.countDown(); idle.countDown();
} }
},5,TimeUnit.MILLISECONDS); }, 5, TimeUnit.MILLISECONDS);
return Action.IDLE; return Action.IDLE;
case 2: case 2:
succeeded(); succeeded();
return Action.SCHEDULED; return Action.SCHEDULED;
case 1: case 1:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); scheduler.schedule(successTask, 5, TimeUnit.MILLISECONDS);
return Action.SCHEDULED; return Action.SCHEDULED;
case 0: case 0:
return Action.SUCCEEDED; return Action.SUCCEEDED;
default: default:
throw new IllegalStateException(); throw new IllegalStateException();
} }
} }
}; };
cb.iterate(); cb.iterate();
idle.await(10,TimeUnit.SECONDS); idle.await(10, TimeUnit.SECONDS);
Assert.assertTrue(cb.isIdle()); Assert.assertTrue(cb.isIdle());
cb.iterate(); cb.iterate();
Assert.assertTrue(cb.waitForComplete()); Assert.assertTrue(cb.waitForComplete());
Assert.assertEquals(6,cb.processed); Assert.assertEquals(6, cb.processed);
} }
@Test
public void testCloseDuringProcessingReturningScheduled() throws Exception
{
testCloseDuringProcessing(IteratingCallback.Action.SCHEDULED);
}
@Test
public void testCloseDuringProcessingReturningSucceeded() throws Exception
{
testCloseDuringProcessing(IteratingCallback.Action.SUCCEEDED);
}
private void testCloseDuringProcessing(final IteratingCallback.Action action) throws Exception
{
final CountDownLatch failureLatch = new CountDownLatch(1);
IteratingCallback callback = new IteratingCallback()
{
@Override
protected Action process() throws Exception
{
close();
return action;
}
@Override
protected void onCompleteFailure(Throwable cause)
{
failureLatch.countDown();
}
};
callback.iterate();
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
}
private abstract static class TestCB extends IteratingCallback private abstract static class TestCB extends IteratingCallback
{ {
CountDownLatch completed = new CountDownLatch(1); protected Runnable successTask = new Runnable()
int processed=0; {
@Override
public void run()
{
succeeded();
}
};
protected Runnable failTask = new Runnable()
{
@Override
public void run()
{
failed(new Exception("testing failure"));
}
};
protected CountDownLatch completed = new CountDownLatch(1);
protected int processed = 0;
@Override @Override
protected void onCompleteSuccess() protected void onCompleteSuccess()
{ {
completed.countDown(); completed.countDown();
} }
@Override @Override
public void onCompleteFailure(Throwable x) public void onCompleteFailure(Throwable x)
{ {
@ -275,26 +323,8 @@ public class IteratingCallbackTest
boolean waitForComplete() throws InterruptedException boolean waitForComplete() throws InterruptedException
{ {
completed.await(10,TimeUnit.SECONDS); completed.await(10, TimeUnit.SECONDS);
return isSucceeded(); return isSucceeded();
} }
Runnable successTask = new Runnable()
{
@Override
public void run()
{
succeeded();
}
};
Runnable failTask = new Runnable()
{
@Override
public void run()
{
failed(new Exception("testing failure"));
}
};
} }
} }