From 0a2aeb54a1c2ad221cee2edb9b6f0f6960a10e6e Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 3 Sep 2014 10:46:18 +1000 Subject: [PATCH 1/3] 435322 Added a idleTimeout to the SharedBlockerCallback --- .../jetty/util/SharedBlockingCallback.java | 52 ++++++----- .../util/SharedBlockingCallbackTest.java | 88 ++++++++++++++++++- 2 files changed, 116 insertions(+), 24 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index 602e48463c1..168a19d8b86 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -48,7 +48,7 @@ import org.eclipse.jetty.util.thread.NonBlockingThread; */ public class SharedBlockingCallback { - private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); + static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); final ReentrantLock _lock = new ReentrantLock(); final Condition _idle = _lock.newCondition(); @@ -128,6 +128,12 @@ public class SharedBlockingCallback return _blocker; } + protected void notComplete(Blocker blocker) + { + LOG.warn("Blocker not complete {}",blocker); + if (LOG.isDebugEnabled()) + LOG.debug(new Throwable()); + } /* ------------------------------------------------------------ */ /** A Closeable Callback. @@ -152,8 +158,8 @@ public class SharedBlockingCallback _state = SUCCEEDED; _complete.signalAll(); } - else if (_state == IDLE) - throw new IllegalStateException("IDLE"); + else + throw new IllegalStateException(_state); } finally { @@ -169,11 +175,17 @@ public class SharedBlockingCallback { if (_state == null) { - _state = cause==null?FAILED:cause; + if (cause==null) + _state=FAILED; + else if (cause instanceof BlockerTimeoutException) + // Not this blockers timeout + _state=new IOException(cause); + else + _state=cause; _complete.signalAll(); } - else if (_state == IDLE) - throw new IllegalStateException("IDLE",cause); + else + throw new IllegalStateException(_state); } finally { @@ -202,15 +214,9 @@ public class SharedBlockingCallback if (idle>0) { if (!_complete.await(idle,TimeUnit.MILLISECONDS)) - { // The callback has not arrived in sufficient time. - // We will synthesize a TimeoutException and then - // create a new Blocker, so that any late arriving callback - // does not cause a problem with the next cycle. - _state=new TimeoutException("No Blocker CB"); - LOG.warn(_state); - _blocker=new Blocker(); - } + // We will synthesize a TimeoutException + _state=new BlockerTimeoutException(); } else _complete.await(); @@ -260,17 +266,19 @@ public class SharedBlockingCallback if (_state == IDLE) throw new IllegalStateException("IDLE"); if (_state == null) - { - LOG.warn("Blocker not complete {}",this); - if (LOG.isDebugEnabled()) - LOG.debug(new Throwable()); - } + notComplete(this); } finally { try { - _state = IDLE; + // If the blocker timed itself out, remember the state + if (_state instanceof BlockerTimeoutException) + // and create a new Blocker + _blocker=new Blocker(); + else + // else reuse Blocker + _state = IDLE; _idle.signalAll(); _complete.signalAll(); } @@ -295,4 +303,8 @@ public class SharedBlockingCallback } } } + + private class BlockerTimeoutException extends TimeoutException + { + } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java index 4ee93a6b19b..c06e5070d1c 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java @@ -18,9 +18,17 @@ package org.eclipse.jetty.util; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.hamcrest.Matchers; @@ -29,7 +37,23 @@ import org.junit.Test; public class SharedBlockingCallbackTest { - final SharedBlockingCallback sbcb= new SharedBlockingCallback(); + final AtomicInteger notComplete = new AtomicInteger(); + final SharedBlockingCallback sbcb= new SharedBlockingCallback() + { + @Override + protected long getIdleTimeout() + { + return 150; + } + + @Override + protected void notComplete(Blocker blocker) + { + super.notComplete(blocker); + notComplete.incrementAndGet(); + } + + }; public SharedBlockingCallbackTest() { @@ -46,7 +70,8 @@ public class SharedBlockingCallbackTest start=System.currentTimeMillis(); blocker.block(); } - Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); + Assert.assertEquals(0,notComplete.get()); } @Test @@ -74,6 +99,7 @@ public class SharedBlockingCallbackTest } Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); + Assert.assertEquals(0,notComplete.get()); } @Test @@ -95,7 +121,8 @@ public class SharedBlockingCallbackTest start=System.currentTimeMillis(); Assert.assertEquals(ex,ee.getCause()); } - Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); + Assert.assertEquals(0,notComplete.get()); } @Test @@ -133,6 +160,7 @@ public class SharedBlockingCallbackTest } Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); + Assert.assertEquals(0,notComplete.get()); } @@ -174,6 +202,58 @@ public class SharedBlockingCallbackTest blocker.succeeded(); blocker.block(); }; - Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); + Assert.assertEquals(0,notComplete.get()); + } + + @Test + public void testBlockerClose() throws Exception + { + try (Blocker blocker=sbcb.acquire()) + { + SharedBlockingCallback.LOG.info("Blocker not complete "+blocker+" warning is expected..."); + } + + Assert.assertEquals(1,notComplete.get()); + } + + @Test + public void testBlockerTimeout() throws Exception + { + Blocker b0=null; + try + { + try (Blocker blocker=sbcb.acquire()) + { + b0=blocker; + Thread.sleep(400); + blocker.block(); + } + fail(); + } + catch(IOException e) + { + Throwable cause = e.getCause(); + assertThat(cause,instanceOf(TimeoutException.class)); + } + + Assert.assertEquals(0,notComplete.get()); + + + try (Blocker blocker=sbcb.acquire()) + { + assertThat(blocker,not(equalTo(b0))); + try + { + b0.succeeded(); + fail(); + } + catch(Exception e) + { + assertThat(e,instanceOf(IllegalStateException.class)); + assertThat(e.getCause(),instanceOf(TimeoutException.class)); + } + blocker.succeeded(); + } } } From e03b5114156ec482f21c7009fc976447de63f89b Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 3 Sep 2014 15:00:40 +1000 Subject: [PATCH 2/3] trying to catch intermittent test failure --- .../src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java index 40d99b06e66..bcbc1066513 100644 --- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java +++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java @@ -222,6 +222,7 @@ public class ProxyServletTest .timeout(5, TimeUnit.SECONDS) .send(); + Assert.assertEquals("OK", response.getReason()); Assert.assertEquals(200, response.getStatus()); Assert.assertTrue(response.getHeaders().containsKey(PROXIED_HEADER)); } From c3647f9d9f64e396bc0c5c99f4d9b7103dff386b Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 3 Sep 2014 15:01:10 +1000 Subject: [PATCH 3/3] 443158 Fixed HttpOutput spin --- .../java/org/eclipse/jetty/server/HttpOutput.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 122f2935295..4454f01e43a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritePendingException; import java.util.concurrent.atomic.AtomicReference; + import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; @@ -777,14 +778,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable } continue; } - + switch(_state.get()) { - case READY: case CLOSED: // even though a write is not possible, because a close has // occurred, we need to call onWritePossible to tell async // producer that the last write completed. + // so fall through + case READY: try { _writeListener.onWritePossible(); @@ -795,8 +797,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable _onError=e; } break; + default: - + _onError=new IllegalStateException("state="+_state.get()); } } } @@ -841,7 +844,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void onCompleteFailure(Throwable e) { - _onError=e; + _onError=e==null?new IOException():e; _channel.getState().onWritePossible(); } }