Merge remote-tracking branch 'origin/master' into jetty-http2

Conflicts:
	jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
This commit is contained in:
Greg Wilkins 2014-09-03 15:19:54 +10:00
commit 5be1fb6270
4 changed files with 124 additions and 33 deletions

View File

@ -222,6 +222,7 @@ public class ProxyServletTest
.timeout(5, TimeUnit.SECONDS) .timeout(5, TimeUnit.SECONDS)
.send(); .send();
Assert.assertEquals("OK", response.getReason());
Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(200, response.getStatus());
Assert.assertTrue(response.getHeaders().containsKey(PROXIED_HEADER)); Assert.assertTrue(response.getHeaders().containsKey(PROXIED_HEADER));
} }

View File

@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritePendingException; import java.nio.channels.WritePendingException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest; import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse; import javax.servlet.ServletResponse;
@ -811,12 +811,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
switch(_state.get()) switch(_state.get())
{ {
case READY:
case CLOSED: case CLOSED:
{
// even though a write is not possible, because a close has // even though a write is not possible, because a close has
// occurred, we need to call onWritePossible to tell async // occurred, we need to call onWritePossible to tell async
// producer that the last write completed. // producer that the last write completed.
// so fall through
case READY:
try try
{ {
_writeListener.onWritePossible(); _writeListener.onWritePossible();
@ -827,11 +827,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_onError = e; _onError = e;
} }
break; break;
}
default: default:
{ _onError=new IllegalStateException("state="+_state.get());
break;
}
} }
} }
} }
@ -876,7 +874,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void onCompleteFailure(Throwable e) public void onCompleteFailure(Throwable e)
{ {
_onError=e; _onError=e==null?new IOException():e;
_channel.getState().onWritePossible(); _channel.getState().onWritePossible();
} }
} }

View File

@ -48,7 +48,7 @@ import org.eclipse.jetty.util.thread.NonBlockingThread;
*/ */
public class SharedBlockingCallback public class SharedBlockingCallback
{ {
private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); static final Logger LOG = Log.getLogger(SharedBlockingCallback.class);
final ReentrantLock _lock = new ReentrantLock(); final ReentrantLock _lock = new ReentrantLock();
final Condition _idle = _lock.newCondition(); final Condition _idle = _lock.newCondition();
@ -128,6 +128,12 @@ public class SharedBlockingCallback
return _blocker; return _blocker;
} }
protected void notComplete(Blocker blocker)
{
LOG.warn("Blocker not complete {}",blocker);
if (LOG.isDebugEnabled())
LOG.debug(new Throwable());
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** A Closeable Callback. /** A Closeable Callback.
@ -152,8 +158,8 @@ public class SharedBlockingCallback
_state = SUCCEEDED; _state = SUCCEEDED;
_complete.signalAll(); _complete.signalAll();
} }
else if (_state == IDLE) else
throw new IllegalStateException("IDLE"); throw new IllegalStateException(_state);
} }
finally finally
{ {
@ -169,11 +175,17 @@ public class SharedBlockingCallback
{ {
if (_state == null) if (_state == null)
{ {
_state = cause==null?FAILED:cause; if (cause==null)
_state=FAILED;
else if (cause instanceof BlockerTimeoutException)
// Not this blockers timeout
_state=new IOException(cause);
else
_state=cause;
_complete.signalAll(); _complete.signalAll();
} }
else if (_state == IDLE) else
throw new IllegalStateException("IDLE",cause); throw new IllegalStateException(_state);
} }
finally finally
{ {
@ -202,15 +214,9 @@ public class SharedBlockingCallback
if (idle>0) if (idle>0)
{ {
if (!_complete.await(idle,TimeUnit.MILLISECONDS)) if (!_complete.await(idle,TimeUnit.MILLISECONDS))
{
// The callback has not arrived in sufficient time. // The callback has not arrived in sufficient time.
// We will synthesize a TimeoutException and then // We will synthesize a TimeoutException
// create a new Blocker, so that any late arriving callback _state=new BlockerTimeoutException();
// does not cause a problem with the next cycle.
_state=new TimeoutException("No Blocker CB");
LOG.warn(_state);
_blocker=new Blocker();
}
} }
else else
_complete.await(); _complete.await();
@ -260,16 +266,18 @@ public class SharedBlockingCallback
if (_state == IDLE) if (_state == IDLE)
throw new IllegalStateException("IDLE"); throw new IllegalStateException("IDLE");
if (_state == null) if (_state == null)
{ notComplete(this);
LOG.warn("Blocker not complete {}",this);
if (LOG.isDebugEnabled())
LOG.debug(new Throwable());
}
} }
finally finally
{ {
try try
{ {
// If the blocker timed itself out, remember the state
if (_state instanceof BlockerTimeoutException)
// and create a new Blocker
_blocker=new Blocker();
else
// else reuse Blocker
_state = IDLE; _state = IDLE;
_idle.signalAll(); _idle.signalAll();
_complete.signalAll(); _complete.signalAll();
@ -295,4 +303,8 @@ public class SharedBlockingCallback
} }
} }
} }
private class BlockerTimeoutException extends TimeoutException
{
}
} }

View File

@ -18,9 +18,17 @@
package org.eclipse.jetty.util; package org.eclipse.jetty.util;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
@ -29,7 +37,23 @@ import org.junit.Test;
public class SharedBlockingCallbackTest public class SharedBlockingCallbackTest
{ {
final SharedBlockingCallback sbcb= new SharedBlockingCallback(); final AtomicInteger notComplete = new AtomicInteger();
final SharedBlockingCallback sbcb= new SharedBlockingCallback()
{
@Override
protected long getIdleTimeout()
{
return 150;
}
@Override
protected void notComplete(Blocker blocker)
{
super.notComplete(blocker);
notComplete.incrementAndGet();
}
};
public SharedBlockingCallbackTest() public SharedBlockingCallbackTest()
{ {
@ -47,6 +71,7 @@ public class SharedBlockingCallbackTest
blocker.block(); blocker.block();
} }
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
Assert.assertEquals(0,notComplete.get());
} }
@Test @Test
@ -74,6 +99,7 @@ public class SharedBlockingCallbackTest
} }
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
Assert.assertEquals(0,notComplete.get());
} }
@Test @Test
@ -96,6 +122,7 @@ public class SharedBlockingCallbackTest
Assert.assertEquals(ex,ee.getCause()); Assert.assertEquals(ex,ee.getCause());
} }
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
Assert.assertEquals(0,notComplete.get());
} }
@Test @Test
@ -133,6 +160,7 @@ public class SharedBlockingCallbackTest
} }
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
Assert.assertEquals(0,notComplete.get());
} }
@ -175,5 +203,57 @@ public class SharedBlockingCallbackTest
blocker.block(); blocker.block();
}; };
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));
Assert.assertEquals(0,notComplete.get());
}
@Test
public void testBlockerClose() throws Exception
{
try (Blocker blocker=sbcb.acquire())
{
SharedBlockingCallback.LOG.info("Blocker not complete "+blocker+" warning is expected...");
}
Assert.assertEquals(1,notComplete.get());
}
@Test
public void testBlockerTimeout() throws Exception
{
Blocker b0=null;
try
{
try (Blocker blocker=sbcb.acquire())
{
b0=blocker;
Thread.sleep(400);
blocker.block();
}
fail();
}
catch(IOException e)
{
Throwable cause = e.getCause();
assertThat(cause,instanceOf(TimeoutException.class));
}
Assert.assertEquals(0,notComplete.get());
try (Blocker blocker=sbcb.acquire())
{
assertThat(blocker,not(equalTo(b0)));
try
{
b0.succeeded();
fail();
}
catch(Exception e)
{
assertThat(e,instanceOf(IllegalStateException.class));
assertThat(e.getCause(),instanceOf(TimeoutException.class));
}
blocker.succeeded();
}
} }
} }