diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java index e8ea548547a..0837516f38c 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java @@ -481,7 +481,7 @@ public class HttpGenerator } catch(BufferOverflowException e) { - throw new BadMessageException(INTERNAL_SERVER_ERROR_500,"Request header too large",e); + throw new BadMessageException(INTERNAL_SERVER_ERROR_500,"Response header too large",e); } catch(Exception e) { diff --git a/jetty-http/src/test/java/org/eclipse/jetty/http/CookieCutterTest.java b/jetty-http/src/test/java/org/eclipse/jetty/http/CookieCutterTest.java index 1a7a8b865ce..c3fb6be2a0d 100644 --- a/jetty-http/src/test/java/org/eclipse/jetty/http/CookieCutterTest.java +++ b/jetty-http/src/test/java/org/eclipse/jetty/http/CookieCutterTest.java @@ -23,12 +23,15 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +@RunWith(AdvancedRunner.class) public class CookieCutterTest { private Cookie[] parseCookieHeaders(CookieCompliance compliance,String... headers) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java index 7882518dcb6..c139924cbd8 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.ServletInputStream; @@ -36,6 +37,7 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; @@ -43,11 +45,17 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.thread.Invocable.InvocationType; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -422,7 +430,7 @@ public class IdleTimeoutTest extends AbstractTest } @Test - public void testStreamIdleTimeoutIsNotEnforcedWhenReceiving() throws Exception + public void testServerStreamIdleTimeoutIsNotEnforcedWhenReceiving() throws Exception { final CountDownLatch timeoutLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() @@ -468,10 +476,11 @@ public class IdleTimeoutTest extends AbstractTest { return InvocationType.NON_BLOCKING; } + @Override public void succeeded() { - // Idle timeout should not fire while receiving. + // Idle timeout should not fire while the server is receiving. Assert.assertEquals(1, timeoutLatch.getCount()); dataLatch.countDown(); } @@ -485,7 +494,7 @@ public class IdleTimeoutTest extends AbstractTest } @Test - public void testStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception + public void testClientStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception { final CountDownLatch resetLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() @@ -589,6 +598,83 @@ public class IdleTimeoutTest extends AbstractTest Assert.assertTrue(latch.await(2 * (contentLength / bufferSize + 1) * delay, TimeUnit.MILLISECONDS)); } + @Test + public void testServerIdleTimeoutIsEnforcedForQueuedRequest() throws Exception + { + long idleTimeout = 2000; + // Use a small thread pool to cause request queueing. + QueuedThreadPool serverExecutor = new QueuedThreadPool(4); + serverExecutor.setName("server"); + server = new Server(serverExecutor); + HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration()); + h2.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + h2.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + h2.setStreamIdleTimeout(idleTimeout); + connector = new ServerConnector(server, 1, 1, h2); + connector.setIdleTimeout(10 * idleTimeout); + server.addConnector(connector); + ServletContextHandler context = new ServletContextHandler(server, "/", true, false); + AtomicReference phaser = new AtomicReference<>(); + context.addServlet(new ServletHolder(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + phaser.get().countDown(); + + // Hold the dispatched requests enough for the idle requests to idle timeout. + sleep(2 * idleTimeout); + } + }), servletPath + "/*"); + server.start(); + + prepareClient(); + client.start(); + + Session client = newClient(new Session.Listener.Adapter()); + + // Send requests until one is queued on the server but not dispatched. + while (true) + { + phaser.set(new CountDownLatch(1)); + + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(10); + stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP); + + if (!phaser.get().await(1, TimeUnit.SECONDS)) + break; + } + + // Send one more request to consume the whole session flow control window. + CountDownLatch resetLatch = new CountDownLatch(1); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + resetLatch.countDown(); + } + }); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(((ISession)client).updateSendWindow(0)); + stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP); + + Assert.assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + + // Wait for WINDOW_UPDATEs to be processed by the client. + sleep(1000); + + Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); + } + private void sleep(long value) { try diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index 90727d255a7..56c8adfdd19 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -24,7 +24,9 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -39,6 +41,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; @@ -51,13 +54,22 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpOutput; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -242,7 +254,7 @@ public class StreamResetTest extends AbstractTest response.setStatus(200); response.setContentType("text/plain;charset=" + charset.name()); - response.setContentLength(data.length*10); + response.setContentLength(data.length * 10); response.flushBuffer(); try @@ -259,7 +271,7 @@ public class StreamResetTest extends AbstractTest { // Write some content after the stream has // been reset, it should throw an exception. - for (int i=0;i<10;i++) + for (int i = 0; i < 10; i++) { Thread.sleep(500); response.getOutputStream().write(data); @@ -407,6 +419,106 @@ public class StreamResetTest extends AbstractTest Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); } + @Test + public void testClientResetConsumesQueuedRequestWithData() throws Exception + { + // Use a small thread pool. + QueuedThreadPool serverExecutor = new QueuedThreadPool(4); + serverExecutor.setName("server"); + serverExecutor.setDetailedDump(true); + server = new Server(serverExecutor); + HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration()); + h2.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + h2.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + connector = new ServerConnector(server, 1, 1, h2); + server.addConnector(connector); + ServletContextHandler context = new ServletContextHandler(server, "/"); + AtomicReference phaser = new AtomicReference<>(); + context.addServlet(new ServletHolder(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + Log.getLogger(StreamResetTest.class).info("SIMON: uri={}", request.getRequestURI()); + phaser.get().countDown(); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + }), servletPath + "/*"); + server.start(); + + prepareClient(); + client.start(); + + Session client = newClient(new Session.Listener.Adapter()); + + // Send requests until one is queued on the server but not dispatched. + AtomicReference latch = new AtomicReference<>(); + List streams = new ArrayList<>(); + while (true) + { + phaser.set(new CountDownLatch(1)); + + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + Log.getLogger(StreamResetTest.class).info("SIMON: response={}/{}", stream.getId(), frame.getMetaData()); + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (response.getStatus() == HttpStatus.OK_200) + latch.get().countDown(); + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + Log.getLogger(StreamResetTest.class).info("SIMON: data={}/{}", stream.getId(), frame); + callback.succeeded(); + if (frame.isEndStream()) + latch.get().countDown(); + } + }); + Stream stream = promise.get(5, TimeUnit.SECONDS); + streams.add(stream); + ByteBuffer data = ByteBuffer.allocate(10); + stream.data(new DataFrame(stream.getId(), data, false), Callback.NOOP); + + if (!phaser.get().await(1, TimeUnit.SECONDS)) + break; + } + + // Send one more request to consume the whole session flow control window, then reset it. + MetaData.Request request = newRequest("GET", "/x", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + // This request will get no event from the server since it's reset by the client. + client.newStream(frame, promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(((ISession)client).updateSendWindow(0)); + stream.data(new DataFrame(stream.getId(), data, false), new Callback() + { + @Override + public void succeeded() + { + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), NOOP); + } + }); + + // Wait for WINDOW_UPDATEs to be processed by the client. + Thread.sleep(1000); + + Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); + + latch.set(new CountDownLatch(2 * streams.size())); + // Complete all streams. + streams.forEach(s -> s.data(new DataFrame(s.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP)); + + Assert.assertTrue(latch.get().await(5, TimeUnit.SECONDS)); + } + @Test public void testServerExceptionConsumesQueuedData() throws Exception { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 39350c2e70b..881e0b6ae14 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.http2.parser.Parser; @@ -248,6 +247,12 @@ public class HTTP2Connection extends AbstractConnection buffer = null; } } + + @Override + public String toString() + { + return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + } } private class FillableCallback implements Callback @@ -267,7 +272,7 @@ public class HTTP2Connection extends AbstractConnection @Override public InvocationType getInvocationType() { - return InvocationType.EITHER; + return InvocationType.NON_BLOCKING; } } } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java index b0e6c8d2dbe..8596c7e8720 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java @@ -49,7 +49,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne private int maxHeaderBlockFragment = 0; private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private long streamIdleTimeout; - private int reservedThreads = -1; + private int reservedThreads; public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) { @@ -154,7 +154,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne public void setReservedThreads(int threads) { - this.reservedThreads = threads; + // TODO: currently disabled since the only value that works is 0. +// this.reservedThreads = threads; } public HttpConfiguration getHttpConfiguration() @@ -193,7 +194,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne try { - executor = new ReservedThreadExecutor(connector.getExecutor(),getReservedThreads()); + executor = new ReservedThreadExecutor(connector.getExecutor(), getReservedThreads()); executor.start(); connector.addBean(executor,true); } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index e3092db044d..8e51c970319 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -25,7 +25,6 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Queue; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.http.BadMessageException; @@ -223,7 +222,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection { HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); if (channel != null) - result &= !channel.isRequestHandled(); + result &= !channel.isRequestExecuting(); } if (LOG.isDebugEnabled()) LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure); diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 6487577ba87..bceb73376ce 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -55,7 +55,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable private boolean _expect100Continue; private boolean _delayedUntilContent; - private boolean _handled; public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport) { @@ -123,7 +122,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable _delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() && !endStream && !_expect100Continue; - _handled = !_delayedUntilContent; if (LOG.isDebugEnabled()) { @@ -192,7 +190,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable { _expect100Continue = false; _delayedUntilContent = false; - _handled = false; super.recycle(); getHttpTransport().recycle(); } @@ -279,8 +276,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable boolean wasDelayed = _delayedUntilContent; _delayedUntilContent = false; - if (wasDelayed) - _handled = true; return handle || wasDelayed ? this : null; } @@ -302,35 +297,31 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable boolean wasDelayed = _delayedUntilContent; _delayedUntilContent = false; - if (wasDelayed) - _handled = true; return handle || wasDelayed ? this : null; } - public boolean isRequestHandled() + public boolean isRequestExecuting() { - return _handled; + return !getState().isIdle(); } public boolean onStreamTimeout(Throwable failure) { - if (!_handled) - return true; - - HttpInput input = getRequest().getHttpInput(); - boolean readFailed = input.failed(failure); - if (readFailed) + getHttpTransport().onStreamTimeout(failure); + if (getRequest().getHttpInput().onIdleTimeout(failure)) handle(); - boolean writeFailed = getHttpTransport().onStreamTimeout(failure); + if (isRequestExecuting()) + return false; - return readFailed || writeFailed; + consumeInput(); + return true; } public void onFailure(Throwable failure) { getHttpTransport().onStreamFailure(failure); - if (onEarlyEOF()) + if (getRequest().getHttpInput().failed(failure)) { ContextHandler handler = getState().getContextHandler(); if (handler != null) @@ -342,6 +333,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable { getState().asyncError(failure); } + consumeInput(); } protected void consumeInput() diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index ea90b4a8246..4b663413097 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -39,7 +39,6 @@ public abstract class FillInterest { private final static Logger LOG = Log.getLogger(FillInterest.class); private final AtomicReference _interested = new AtomicReference<>(null); - private Throwable _lastSet; protected FillInterest() { @@ -58,8 +57,6 @@ public abstract class FillInterest if (!tryRegister(callback)) { LOG.warn("Read pending for {} prevented {}", _interested, callback); - if (LOG.isDebugEnabled()) - LOG.warn("callback set at ",_lastSet); throw new ReadPendingException(); } } @@ -81,10 +78,7 @@ public abstract class FillInterest return false; if (LOG.isDebugEnabled()) - { - LOG.debug("{} register {}",this,callback); - _lastSet=new Throwable(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + Thread.currentThread().getName()); - } + LOG.debug("interested {}",this); try { @@ -103,9 +97,9 @@ public abstract class FillInterest */ public void fillable() { - Callback callback = _interested.get(); if (LOG.isDebugEnabled()) - LOG.debug("{} fillable {}",this,callback); + LOG.debug("fillable {}",this); + Callback callback = _interested.get(); if (callback != null && _interested.compareAndSet(callback, null)) callback.succeeded(); else if (LOG.isDebugEnabled()) @@ -134,6 +128,8 @@ public abstract class FillInterest */ public boolean onFail(Throwable cause) { + if (LOG.isDebugEnabled()) + LOG.debug("onFail {} {}",this,cause); Callback callback = _interested.get(); if (callback != null && _interested.compareAndSet(callback, null)) { @@ -145,9 +141,9 @@ public abstract class FillInterest public void onClose() { - Callback callback = _interested.get(); if (LOG.isDebugEnabled()) - LOG.debug("{} onClose {}",this,callback); + LOG.debug("onClose {}",this); + Callback callback = _interested.get(); if (callback != null && _interested.compareAndSet(callback, null)) callback.failed(new ClosedChannelException()); } @@ -155,7 +151,7 @@ public abstract class FillInterest @Override public String toString() { - return String.format("FillInterest@%x{%b,%s}", hashCode(), _interested.get()!=null, _interested.get()); + return String.format("FillInterest@%x{%s}", hashCode(), _interested.get()); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 1af25a979f4..07d3b71bdc0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -281,10 +281,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable if (task != null) return task; } - else if (key.isAcceptable()) - { - processAccept(key); - } else { throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps()); @@ -328,6 +324,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable if (attachment instanceof Selectable) ((Selectable)attachment).updateKey(); } + + @Override + public String toString() + { + return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + } } private abstract static class NonBlockingAction implements Runnable, Invocable @@ -380,27 +382,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - private void processAccept(SelectionKey key) - { - SelectableChannel server = key.channel(); - SelectableChannel channel = null; - try - { - while(true) - { - channel = _selectorManager.doAccept(server); - if (channel==null) - break; - _selectorManager.accepted(channel); - } - } - catch (Throwable x) - { - closeNoExceptions(channel); - LOG.warn("Accept failed for channel " + channel, x); - } - } - private void closeNoExceptions(Closeable closeable) { try @@ -524,9 +505,10 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable } } - class Acceptor extends NonBlockingAction + class Acceptor extends NonBlockingAction implements Selectable, Closeable { private final SelectableChannel _channel; + private SelectionKey _key; public Acceptor(SelectableChannel channel) { @@ -538,9 +520,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable { try { - SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, "Acceptor"); + if (_key==null) + { + _key = _channel.register(_selector, SelectionKey.OP_ACCEPT, this); + } + + if (LOG.isDebugEnabled()) - LOG.debug("{} acceptor={}", this, key); + LOG.debug("{} acceptor={}", this, _key); } catch (Throwable x) { @@ -548,6 +535,44 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable LOG.warn(x); } } + + @Override + public Runnable onSelected() + { + SelectableChannel server = _key.channel(); + SelectableChannel channel = null; + try + { + while(true) + { + channel = _selectorManager.doAccept(server); + if (channel==null) + break; + _selectorManager.accepted(channel); + } + } + catch (Throwable x) + { + closeNoExceptions(channel); + LOG.warn("Accept failed for channel " + channel, x); + } + + return null; + } + + @Override + public void updateKey() + { + } + + @Override + public void close() throws IOException + { + SelectionKey key = _key; + _key = null; + if (key!=null && key.isValid()) + key.cancel(); + } } class Accept extends NonBlockingAction implements Closeable diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index d614e7bc8dd..e99114fa18b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.io; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; @@ -267,11 +268,14 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump * overridden by a derivation of this class to handle the accepted channel * * @param server the server channel to register + * @return A Closable that allows the acceptor to be cancelled */ - public void acceptor(SelectableChannel server) + public Closeable acceptor(SelectableChannel server) { final ManagedSelector selector = chooseSelector(null); - selector.submit(selector.new Acceptor(server)); + ManagedSelector.Acceptor acceptor = selector.new Acceptor(server); + selector.submit(acceptor); + return acceptor; } /** @@ -435,4 +439,5 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump */ public abstract Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException; + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 232f96ac97f..ec8b1e85237 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -34,6 +35,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool; @@ -48,6 +50,7 @@ import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; @@ -137,8 +140,10 @@ import org.eclipse.jetty.util.thread.Scheduler; public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable { protected final Logger LOG = Log.getLogger(AbstractConnector.class); - // Order is important on server side, so we use a LinkedHashMap - private final Map _factories = new LinkedHashMap<>(); + + private final Locker _locker = new Locker(); + private final Condition _setAccepting = _locker.newCondition(); + private final Map _factories = new LinkedHashMap<>(); // Order is important on server side, so we use a LinkedHashMap private final Server _server; private final Executor _executor; private final Scheduler _scheduler; @@ -146,12 +151,13 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co private final Thread[] _acceptors; private final Set _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set _immutableEndPoints = Collections.unmodifiableSet(_endpoints); - private volatile CountDownLatch _stopping; + private CountDownLatch _stopping; private long _idleTimeout = 30000; private String _defaultProtocol; private ConnectionFactory _defaultConnectionFactory; private String _name; private int _acceptorPriorityDelta=-2; + private boolean _accepting = true; /** @@ -222,7 +228,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co { return _idleTimeout; } - + /** *

Sets the maximum Idle time for a connection, which roughly translates to the {@link Socket#setSoTimeout(int)} * call, although with NIO implementations other mechanisms may be used to implement the timeout.

@@ -283,7 +289,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co protected void interruptAcceptors() { - synchronized (this) + try (Locker.Lock lock = _locker.lockIfNotHeld()) { for (Thread thread : _acceptors) { @@ -327,7 +333,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co public void join(long timeout) throws InterruptedException { - synchronized (this) + try (Locker.Lock lock = _locker.lock()) { for (Thread thread : _acceptors) if (thread != null) @@ -338,19 +344,31 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co protected abstract void accept(int acceptorID) throws IOException, InterruptedException; - /* ------------------------------------------------------------ */ /** * @return Is the connector accepting new connections */ - protected boolean isAccepting() + public boolean isAccepting() { - return isRunning(); + try (Locker.Lock lock = _locker.lock()) + { + return _accepting; + } } + public void setAccepting(boolean accepting) + { + try (Locker.Lock lock = _locker.lock()) + { + _accepting=accepting; + _setAccepting.signalAll(); + } + } + + @Override public ConnectionFactory getConnectionFactory(String protocol) { - synchronized (_factories) + try (Locker.Lock lock = _locker.lock()) { return _factories.get(StringUtil.asciiToLowerCase(protocol)); } @@ -359,7 +377,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co @Override public T getConnectionFactory(Class factoryType) { - synchronized (_factories) + try (Locker.Lock lock = _locker.lock()) { for (ConnectionFactory f : _factories.values()) if (factoryType.isAssignableFrom(f.getClass())) @@ -370,7 +388,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co public void addConnectionFactory(ConnectionFactory factory) { - synchronized (_factories) + try (Locker.Lock lock = _locker.lockIfNotHeld()) { Set to_remove = new HashSet<>(); for (String key:factory.getProtocols()) @@ -409,7 +427,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co public void addFirstConnectionFactory(ConnectionFactory factory) { - synchronized (_factories) + try (Locker.Lock lock = _locker.lock()) { List existings = new ArrayList<>(_factories.values()); _factories.clear(); @@ -422,7 +440,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co public void addIfAbsentConnectionFactory(ConnectionFactory factory) { - synchronized (_factories) + try (Locker.Lock lock = _locker.lock()) { String key=StringUtil.asciiToLowerCase(factory.getProtocol()); if (_factories.containsKey(key)) @@ -444,7 +462,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co public ConnectionFactory removeConnectionFactory(String protocol) { - synchronized (_factories) + try (Locker.Lock lock = _locker.lock()) { ConnectionFactory factory= _factories.remove(StringUtil.asciiToLowerCase(protocol)); removeBean(factory); @@ -455,7 +473,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co @Override public Collection getConnectionFactories() { - synchronized (_factories) + try (Locker.Lock lock = _locker.lock()) { return _factories.values(); } @@ -463,7 +481,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co public void setConnectionFactories(Collection factories) { - synchronized (_factories) + try (Locker.Lock lock = _locker.lock()) { List existing = new ArrayList<>(_factories.values()); for (ConnectionFactory factory: existing) @@ -538,14 +556,23 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co return getConnectionFactory(_defaultProtocol); } - protected boolean handleAcceptFailure(Throwable previous, Throwable current) + protected boolean handleAcceptFailure(Throwable ex) { - if (isAccepting()) + if (isRunning()) { - if (previous == null) - LOG.warn(current); - else - LOG.debug(current); + if (ex instanceof InterruptedException) + { + LOG.debug(ex); + return true; + } + + if (ex instanceof ClosedByInterruptException) + { + LOG.debug(ex); + return false; + } + + LOG.warn(ex); try { // Arbitrary sleep to avoid spin looping. @@ -556,12 +583,13 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co } catch (Throwable x) { - return false; + LOG.ignore(x); } + return false; } else { - LOG.ignore(current); + LOG.ignore(ex); return false; } } @@ -595,19 +623,28 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co try { - Throwable exception = null; - while (isAccepting()) + while (isRunning()) { + try (Locker.Lock lock = _locker.lock()) + { + if (!_accepting && isRunning()) + { + _setAccepting.await(); + continue; + } + } + catch (InterruptedException e) + { + continue; + } + try { accept(_id); - exception = null; } catch (Throwable x) { - if (handleAcceptFailure(exception, x)) - exception = x; - else + if (!handleAcceptFailure(x)) break; } } @@ -636,12 +673,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co return String.format("acceptor-%d@%x", _id, hashCode()); return name; } - } - - // protected void connectionOpened(Connection connection) // { // _stats.connectionOpened(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetworkConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetworkConnector.java index 76bd05ee2c0..37b2f86e8c2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetworkConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetworkConnector.java @@ -96,8 +96,6 @@ public abstract class AbstractNetworkConnector extends AbstractConnector impleme @Override public void close() { - // Interrupting is often sufficient to close the channel - interruptAcceptors(); } @@ -107,11 +105,13 @@ public abstract class AbstractNetworkConnector extends AbstractConnector impleme close(); return super.shutdown(); } - - @Override - protected boolean isAccepting() + + protected boolean handleAcceptFailure(Throwable ex) { - return super.isAccepting() && isOpen(); + if (isOpen()) + return super.handleAcceptFailure(ex); + LOG.ignore(ex); + return false; } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index ba184350f28..a081e80a066 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -99,7 +99,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor _requestLog = connector == null ? null : connector.getServer().getRequestLog(); if (LOG.isDebugEnabled()) - LOG.debug("new {} -> {},{},{}",this,_endPoint,_endPoint.getConnection(),_state); + LOG.debug("new {} -> {},{},{}", + this, + _endPoint, + _endPoint==null?null:_endPoint.getConnection(), + _state); } protected HttpInput newHttpInput(HttpChannelState state) @@ -258,10 +262,19 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor _oldIdleTimeout=0; } - public void asyncReadFillInterested() + public void onAsyncWaitForContent() { } + public void onBlockWaitForContent() + { + } + + public void onBlockWaitForContentFailure(Throwable failure) + { + getRequest().getHttpInput().failed(failure); + } + @Override public void run() { @@ -391,7 +404,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor case READ_PRODUCE: { - _request.getHttpInput().produceContent(); + _request.getHttpInput().asyncReadProduce(); break; } @@ -433,7 +446,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor if (hasContent && !_response.isContentComplete(_response.getHttpOutput().getWritten())) { if (isCommitted()) - _transport.abort(new IOException("insufficient content written")); + abort(new IOException("insufficient content written")); else _response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500,"insufficient content written"); } @@ -546,7 +559,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor catch (Throwable x) { failure.addSuppressed(x); - _transport.abort(failure); + abort(failure); } } @@ -843,14 +856,14 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor @Override public void failed(Throwable th) { - _transport.abort(x); + abort(x); super.failed(x); } }); } else { - _transport.abort(x); + abort(x); super.failed(x); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java index faff3ecbe5d..59584d3b488 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java @@ -246,11 +246,23 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque return handle; } - public void asyncReadFillInterested() + public void onAsyncWaitForContent() { _httpConnection.asyncReadFillInterested(); } + @Override + public void onBlockWaitForContent() + { + _httpConnection.blockingReadFillInterested(); + } + + @Override + public void onBlockWaitForContentFailure(Throwable failure) + { + _httpConnection.blockingReadFailure(failure); + } + @Override public void badMessage(int status, String reason) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 6fbc41a9489..c5929b78c1a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -382,7 +382,7 @@ public class HttpChannelState /** * Signal that the HttpConnection has finished handling the request. - * For blocking connectors,this call may block if the request has + * For blocking connectors, this call may block if the request has * been suspended (startAsync called). * @return next actions * be handled again (eg because of a resume that happened before unhandle was called) @@ -498,7 +498,7 @@ public class HttpChannelState finally { if (read_interested) - _channel.asyncReadFillInterested(); + _channel.onAsyncWaitForContent(); } } @@ -1129,8 +1129,8 @@ public class HttpChannelState /** * Called to signal async read isReady() has returned false. * This indicates that there is no content available to be consumed - * and that once the channel enteres the ASYNC_WAIT state it will - * register for read interest by calling {@link HttpChannel#asyncReadFillInterested()} + * and that once the channel enters the ASYNC_WAIT state it will + * register for read interest by calling {@link HttpChannel#onAsyncWaitForContent()} * either from this method or from a subsequent call to {@link #unhandle()}. */ public void onReadUnready() @@ -1165,7 +1165,7 @@ public class HttpChannelState } if (interested) - _channel.asyncReadFillInterested(); + _channel.onAsyncWaitForContent(); } /** diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index d2288e25b37..7a9edb09992 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -557,7 +557,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http getEndPoint().fillInterested(_blockingReadCallback); } - public void blockingReadException(Throwable e) + public void blockingReadFailure(Throwable e) { _blockingReadCallback.failed(e); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index dcaae2593f5..7a3daeb31b5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -18,8 +18,8 @@ package org.eclipse.jetty.server; +import java.io.EOFException; import java.io.IOException; -import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Deque; @@ -133,6 +133,7 @@ public class HttpInput extends ServletInputStream implements Runnable private long _contentArrived; private long _contentConsumed; private long _blockUntil; + private boolean _waitingForContent; private Interceptor _interceptor; public HttpInput(HttpChannelState state) @@ -329,6 +330,18 @@ public class HttpInput extends ServletInputStream implements Runnable protected void produceContent() throws IOException { } + + /** + * Called by channel when asynchronous IO needs to produce more content + * @throws IOException + */ + public void asyncReadProduce() throws IOException + { + synchronized (_inputQ) + { + produceContent(); + } + } /** * Get the next content from the inputQ, calling {@link #produceContent()} if need be. EOF is processed and state changed. @@ -518,70 +531,47 @@ public class HttpInput extends ServletInputStream implements Runnable /** * Blocks until some content or some end-of-file event arrives. * - * @throws IOException - * if the wait is interrupted + * @throws IOException if the wait is interrupted */ protected void blockForContent() throws IOException { try { + _waitingForContent = true; + _channelState.getHttpChannel().onBlockWaitForContent(); + + boolean loop = false; long timeout = 0; - if (_blockUntil != 0) + while (true) { - timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()); - if (timeout <= 0) - throw new TimeoutException(); + if (_blockUntil != 0) + { + timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()); + if (timeout <= 0) + throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout())); + } + + // This method is called from a loop, so we just + // need to check the timeout before and after waiting. + if (loop) + break; + + if (LOG.isDebugEnabled()) + LOG.debug("{} blocking for content timeout={}", this, timeout); + if (timeout > 0) + _inputQ.wait(timeout); + else + _inputQ.wait(); + + loop = true; } - - if (LOG.isDebugEnabled()) - LOG.debug("{} blocking for content timeout={}",this,timeout); - if (timeout > 0) - _inputQ.wait(timeout); - else - _inputQ.wait(); - - // TODO: cannot return unless there is content or timeout, - // TODO: so spurious wakeups are not handled correctly. - - if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0) - throw new TimeoutException(String.format("Blocking timeout %d ms",getBlockingTimeout())); } - catch (Throwable e) + catch (Throwable x) { - throw (IOException)new InterruptedIOException().initCause(e); + _channelState.getHttpChannel().onBlockWaitForContentFailure(x); } } - /** - * Adds some content to the start of this input stream. - *

- * Typically used to push back content that has been read, perhaps mutated. The bytes prepended are deducted for the contentConsumed total - *

- * - * @param item - * the content to add - * @return true if content channel woken for read - */ - public boolean prependContent(Content item) - { - boolean woken = false; - synchronized (_inputQ) - { - if (_content != null) - _inputQ.push(_content); - _content = item; - _contentConsumed -= item.remaining(); - if (LOG.isDebugEnabled()) - LOG.debug("{} prependContent {}",this,item); - - if (_listener == null) - _inputQ.notify(); - else - woken = _channelState.onContentAdded(); - } - return woken; - } - /** * Adds some content to this input stream. * @@ -591,31 +581,36 @@ public class HttpInput extends ServletInputStream implements Runnable */ public boolean addContent(Content content) { - boolean woken = false; synchronized (_inputQ) { + _waitingForContent = false; if (_firstByteTimeStamp == -1) _firstByteTimeStamp = System.nanoTime(); - _contentArrived += content.remaining(); - - if (_content==null && _inputQ.isEmpty()) - _content=content; - else - _inputQ.offer(content); - - if (LOG.isDebugEnabled()) - LOG.debug("{} addContent {}",this,content); - - if (nextInterceptedContent()!=null) + if (isFinished()) { - if (_listener == null) - _inputQ.notify(); + Throwable failure = isError() ? _state.getError() : new EOFException("Content after EOF"); + content.failed(failure); + return false; + } + else + { + _contentArrived += content.remaining(); + + if (_content==null && _inputQ.isEmpty()) + _content=content; else - woken = _channelState.onContentAdded(); + _inputQ.offer(content); + + if (LOG.isDebugEnabled()) + LOG.debug("{} addContent {}",this,content); + + if (nextInterceptedContent()!=null) + return wakeup(); + else + return false; } } - return woken; } public boolean hasContent() @@ -670,13 +665,13 @@ public class HttpInput extends ServletInputStream implements Runnable { try { - while (!isFinished()) + while (true) { Content item = nextContent(); if (item == null) break; // Let's not bother blocking - skip(item,item.remaining()); + skip(item, item.remaining()); } return isFinished() && !isError(); } @@ -713,14 +708,6 @@ public class HttpInput extends ServletInputStream implements Runnable } } - public boolean isAsyncEOF() - { - synchronized (_inputQ) - { - return _state == AEOF; - } - } - @Override public boolean isReady() { @@ -732,10 +719,12 @@ public class HttpInput extends ServletInputStream implements Runnable return true; if (_state instanceof EOFState) return true; + if (_waitingForContent) + return false; if (produceNextContext() != null) return true; - _channelState.onReadUnready(); + _waitingForContent = true; } return false; } @@ -775,6 +764,7 @@ public class HttpInput extends ServletInputStream implements Runnable { _state = ASYNC; _channelState.onReadUnready(); + _waitingForContent = true; } } } @@ -787,18 +777,36 @@ public class HttpInput extends ServletInputStream implements Runnable wake(); } - public boolean failed(Throwable x) + public boolean onIdleTimeout(Throwable x) { - boolean woken = false; synchronized (_inputQ) { - if (_state instanceof ErrorState) + if (_waitingForContent && !isError()) { - // Log both the original and current failure - // without modifying the original failure. - Throwable failure = new Throwable(((ErrorState)_state).getError()); - failure.addSuppressed(x); - LOG.warn(failure); + x.addSuppressed(new Throwable("HttpInput idle timeout")); + _state = new ErrorState(x); + return wakeup(); + } + return false; + } + } + + public boolean failed(Throwable x) + { + synchronized (_inputQ) + { + // Errors may be reported multiple times, for example + // a local idle timeout and a remote I/O failure. + if (isError()) + { + if (LOG.isDebugEnabled()) + { + // Log both the original and current failure + // without modifying the original failure. + Throwable failure = new Throwable(_state.getError()); + failure.addSuppressed(x); + LOG.debug(failure); + } } else { @@ -807,14 +815,16 @@ public class HttpInput extends ServletInputStream implements Runnable x.addSuppressed(new Throwable("HttpInput failure")); _state = new ErrorState(x); } - - if (_listener == null) - _inputQ.notify(); - else - woken = _channelState.onContentAdded(); + return wakeup(); } + } - return woken; + private boolean wakeup() + { + if (_listener != null) + return _channelState.onContentAdded(); + _inputQ.notify(); + return false; } /* @@ -1133,5 +1143,4 @@ public class HttpInput extends ServletInputStream implements Runnable return "AEOF"; } }; - } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java index 204c0642b03..53fad2119ff 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java @@ -32,18 +32,4 @@ public class HttpInputOverHTTP extends HttpInput { ((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).fillAndParseForContent(); } - - @Override - protected void blockForContent() throws IOException - { - ((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).blockingReadFillInterested(); - try - { - super.blockForContent(); - } - catch(Throwable e) - { - ((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).blockingReadException(e); - } - } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index 49072c6858e..20878dd6bea 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -356,6 +356,7 @@ public class Server extends HandlerWrapper implements Attributes setErrorHandler(new ErrorHandler()); if (_errorHandler instanceof ErrorHandler.ErrorPageMapper) LOG.warn("ErrorPageMapper not supported for Server level Error Handling"); + _errorHandler.setServer(this); //If the Server should be stopped when the jvm exits, register //with the shutdown handler thread. diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index 6258e171589..1a07288aa2c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.server; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -31,6 +32,7 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ChannelEndPoint; @@ -79,6 +81,7 @@ import org.eclipse.jetty.util.thread.Scheduler; public class ServerConnector extends AbstractNetworkConnector { private final SelectorManager _manager; + private final AtomicReference _acceptor = new AtomicReference<>(); private volatile ServerSocketChannel _acceptChannel; private volatile boolean _inheritChannel = false; private volatile int _localPort = -1; @@ -237,7 +240,7 @@ public class ServerConnector extends AbstractNetworkConnector if (getAcceptors()==0) { _acceptChannel.configureBlocking(false); - _manager.acceptor(_acceptChannel); + _acceptor.set(_manager.acceptor(_acceptChannel)); } } @@ -344,14 +347,14 @@ public class ServerConnector extends AbstractNetworkConnector @Override public void close() { + super.close(); + ServerSocketChannel serverChannel = _acceptChannel; _acceptChannel = null; - if (serverChannel != null) { removeBean(serverChannel); - // If the interrupt did not close it, we should close it if (serverChannel.isOpen()) { try @@ -364,7 +367,6 @@ public class ServerConnector extends AbstractNetworkConnector } } } - // super.close(); _localPort = -2; } @@ -483,6 +485,38 @@ public class ServerConnector extends AbstractNetworkConnector _reuseAddress = reuseAddress; } + + @Override + public void setAccepting(boolean accepting) + { + super.setAccepting(accepting); + if (getAcceptors()>0) + return; + + try + { + if (accepting) + { + if (_acceptor.get()==null) + { + Closeable acceptor = _manager.acceptor(_acceptChannel); + if (!_acceptor.compareAndSet(null,acceptor)) + acceptor.close(); + } + } + else + { + Closeable acceptor = _acceptor.get(); + if (acceptor!=null && _acceptor.compareAndSet(acceptor,null)) + acceptor.close(); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + protected class ServerConnectorManager extends SelectorManager { public ServerConnectorManager(Executor executor, Scheduler scheduler, int selectors) diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java index bb0acbff787..2b7176c4cd0 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java @@ -41,12 +41,15 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.IO; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class AsyncRequestReadTest { private static Server server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/CheckReverseProxyHeadersTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/CheckReverseProxyHeadersTest.java index 318662c6a8b..25e44cfb994 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/CheckReverseProxyHeadersTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/CheckReverseProxyHeadersTest.java @@ -30,11 +30,14 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.junit.Test; +import org.junit.runner.RunWith; /** * */ +@RunWith(AdvancedRunner.class) public class CheckReverseProxyHeadersTest { @Test diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ClassLoaderDumptTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ClassLoaderDumptTest.java index 5e18ec1428c..4cb583f1de2 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ClassLoaderDumptTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ClassLoaderDumptTest.java @@ -27,8 +27,6 @@ import java.net.URL; import java.net.URLClassLoader; import org.eclipse.jetty.util.component.Dumpable; -import org.hamcrest.Matchers; -import org.junit.Assert; import org.junit.Test; public class ClassLoaderDumptTest @@ -185,7 +183,7 @@ public class ClassLoaderDumptTest StringBuilder out = new StringBuilder(); server.dump(out); String dump = out.toString(); - System.err.println(dump); + // System.err.println(dump); assertThat(dump,containsString("+- TopLoader")); assertThat(dump,containsString("| +- file:/ONE")); assertThat(dump,containsString("| +- file:/TWO")); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectionOpenCloseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectionOpenCloseTest.java index a1e5a86eb74..6fb34bfd1a8 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectionOpenCloseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectionOpenCloseTest.java @@ -39,6 +39,7 @@ import org.eclipse.jetty.http.HttpTester; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.util.IO; @@ -46,7 +47,9 @@ import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class ConnectionOpenCloseTest extends AbstractHttpTest { public ConnectionOpenCloseTest() diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ErrorHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ErrorHandlerTest.java index 6e8aee030a7..7dfd614392a 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ErrorHandlerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ErrorHandlerTest.java @@ -29,10 +29,13 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.handler.ErrorHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class ErrorHandlerTest { Server server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java index fa8fc5b6f6a..ee2af29156e 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java @@ -38,15 +38,18 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.thread.Scheduler; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; /** * Extended Server Tester. */ +@RunWith(AdvancedRunner.class) public class ExtendedServerTest extends HttpServerTestBase { @Before diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ForwardedRequestCustomizerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ForwardedRequestCustomizerTest.java index 51dc3e7fc5d..ccbbaac403d 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ForwardedRequestCustomizerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ForwardedRequestCustomizerTest.java @@ -35,12 +35,15 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.IO; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class ForwardedRequestCustomizerTest { private Server _server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java index 010023e2288..b8cd789b295 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java @@ -44,12 +44,15 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.util.IO; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class GracefulStopTest { /** diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HalfCloseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HalfCloseTest.java index d66169ad9ce..737f0b71c2f 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HalfCloseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HalfCloseTest.java @@ -33,9 +33,12 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.IO; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class HalfCloseTest { @Test diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HostHeaderCustomizerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HostHeaderCustomizerTest.java index 2f95f49008b..53da777a3dd 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HostHeaderCustomizerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HostHeaderCustomizerTest.java @@ -29,11 +29,14 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpTester; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.TestTracker; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class HostHeaderCustomizerTest { @Rule diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpConnectionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpConnectionTest.java index e20fb8d033e..f419224e7ce 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpConnectionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpConnectionTest.java @@ -49,6 +49,7 @@ import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.ErrorHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -58,7 +59,9 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class HttpConnectionTest { private Server server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java index c6facb75090..36e7b9ae461 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java @@ -18,14 +18,6 @@ package org.eclipse.jetty.server; -import static org.eclipse.jetty.server.HttpInput.EARLY_EOF_CONTENT; -import static org.eclipse.jetty.server.HttpInput.EOF_CONTENT; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.Queue; @@ -35,6 +27,7 @@ import javax.servlet.ReadListener; import org.eclipse.jetty.server.HttpChannelState.Action; import org.eclipse.jetty.server.HttpInput.Content; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.thread.Scheduler; import org.hamcrest.Matchers; @@ -42,6 +35,15 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; + +import static org.eclipse.jetty.server.HttpInput.EARLY_EOF_CONTENT; +import static org.eclipse.jetty.server.HttpInput.EOF_CONTENT; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** @@ -49,6 +51,7 @@ import org.junit.Test; */ +@RunWith(AdvancedRunner.class) public class HttpInputAsyncStateTest { @@ -101,10 +104,11 @@ public class HttpInputAsyncStateTest _in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null) { @Override - public void asyncReadFillInterested() + public void onAsyncWaitForContent() { - __history.add("asyncReadFillInterested"); + __history.add("onAsyncWaitForContent"); } + @Override public Scheduler getScheduler() { @@ -314,7 +318,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(EOF_CONTENT); check("onReadPossible true"); @@ -352,7 +356,7 @@ public class HttpInputAsyncStateTest }); _in.setReadListener(_listener); - check("asyncReadFillInterested","onReadUnready"); + check("onAsyncWaitForContent","onReadUnready"); deliver(EOF_CONTENT); check("onReadPossible true"); @@ -406,7 +410,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(EARLY_EOF_CONTENT); check("onReadPossible true"); @@ -444,7 +448,7 @@ public class HttpInputAsyncStateTest }); _in.setReadListener(_listener); - check("asyncReadFillInterested","onReadUnready"); + check("onAsyncWaitForContent","onReadUnready"); deliver(EARLY_EOF_CONTENT); check("onReadPossible true"); @@ -500,7 +504,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -538,7 +542,7 @@ public class HttpInputAsyncStateTest }); _in.setReadListener(_listener); - check("asyncReadFillInterested","onReadUnready"); + check("onAsyncWaitForContent","onReadUnready"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -601,7 +605,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EARLY_EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -647,7 +651,7 @@ public class HttpInputAsyncStateTest }); _in.setReadListener(_listener); - check("asyncReadFillInterested","onReadUnready"); + check("onAsyncWaitForContent","onReadUnready"); deliver(new TContent("Hello"),EARLY_EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -672,7 +676,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -697,7 +701,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -732,7 +736,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java index 6efcf86a206..843f46c3ca9 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java @@ -26,13 +26,16 @@ import java.util.concurrent.TimeoutException; import javax.servlet.ReadListener; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BufferUtil; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class HttpInputTest { private final Queue _history = new LinkedBlockingQueue<>(); @@ -90,9 +93,9 @@ public class HttpInputTest _in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null) { @Override - public void asyncReadFillInterested() + public void onAsyncWaitForContent() { - _history.add("asyncReadFillInterested"); + _history.add("asyncReadInterested"); } }) { @@ -210,82 +213,21 @@ public class HttpInputTest Assert.assertThat(_history.poll(), Matchers.nullValue()); } - @Test - public void testReRead() throws Exception - { - _in.addContent(new TContent("AB")); - _in.addContent(new TContent("CD")); - _fillAndParseSimulate.offer("EF"); - _fillAndParseSimulate.offer("GH"); - Assert.assertThat(_in.available(), Matchers.equalTo(2)); - Assert.assertThat(_in.isFinished(), Matchers.equalTo(false)); - Assert.assertThat(_in.isReady(), Matchers.equalTo(true)); - - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'A')); - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'B')); - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L)); - - Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB")); - Assert.assertThat(_history.poll(), Matchers.nullValue()); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'C')); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'D')); - - Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD")); - Assert.assertThat(_history.poll(), Matchers.nullValue()); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'E')); - - _in.prependContent(new HttpInput.Content(BufferUtil.toBuffer("abcde"))); - - Assert.assertThat(_in.available(), Matchers.equalTo(5)); - Assert.assertThat(_in.isFinished(), Matchers.equalTo(false)); - Assert.assertThat(_in.isReady(), Matchers.equalTo(true)); - - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'a')); - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'b')); - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'c')); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'d')); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'e')); - - Assert.assertThat(_in.read(), Matchers.equalTo((int)'F')); - - Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 2")); - Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded EF")); - Assert.assertThat(_history.poll(), Matchers.nullValue()); - - Assert.assertThat(_in.read(), Matchers.equalTo((int)'G')); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'H')); - - Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded GH")); - Assert.assertThat(_history.poll(), Matchers.nullValue()); - - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(8L)); - - Assert.assertThat(_history.poll(), Matchers.nullValue()); - } - @Test public void testBlockingRead() throws Exception { - new Thread() + new Thread(() -> { - public void run() + try { - try - { - Thread.sleep(500); - _in.addContent(new TContent("AB")); - } - catch (Throwable th) - { - th.printStackTrace(); - } + Thread.sleep(500); + _in.addContent(new TContent("AB")); } - }.start(); + catch (Throwable th) + { + th.printStackTrace(); + } + }).start(); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A')); @@ -364,21 +306,18 @@ public class HttpInputTest @Test public void testBlockingEOF() throws Exception { - new Thread() + new Thread(() -> { - public void run() + try { - try - { - Thread.sleep(500); - _in.eof(); - } - catch (Throwable th) - { - th.printStackTrace(); - } + Thread.sleep(500); + _in.eof(); } - }.start(); + catch (Throwable th) + { + th.printStackTrace(); + } + }).start(); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false)); Assert.assertThat(_in.read(), Matchers.equalTo(-1)); @@ -398,13 +337,9 @@ public class HttpInputTest Assert.assertThat(_history.poll(), Matchers.nullValue()); Assert.assertThat(_in.isReady(), Matchers.equalTo(false)); - Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0")); - Assert.assertThat(_history.poll(), Matchers.equalTo("s.onReadUnready")); Assert.assertThat(_history.poll(), Matchers.nullValue()); Assert.assertThat(_in.isReady(), Matchers.equalTo(false)); - Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0")); - Assert.assertThat(_history.poll(), Matchers.equalTo("s.onReadUnready")); Assert.assertThat(_history.poll(), Matchers.nullValue()); } @@ -417,8 +352,6 @@ public class HttpInputTest Assert.assertThat(_history.poll(), Matchers.nullValue()); Assert.assertThat(_in.isReady(), Matchers.equalTo(false)); - Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0")); - Assert.assertThat(_history.poll(), Matchers.equalTo("s.onReadUnready")); Assert.assertThat(_history.poll(), Matchers.nullValue()); _in.addContent(new TContent("AB")); @@ -485,8 +418,6 @@ public class HttpInputTest Assert.assertThat(_history.poll(), Matchers.nullValue()); Assert.assertThat(_in.isReady(), Matchers.equalTo(false)); - Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0")); - Assert.assertThat(_history.poll(), Matchers.equalTo("s.onReadUnready")); Assert.assertThat(_history.poll(), Matchers.nullValue()); _in.addContent(new TContent("AB")); @@ -532,8 +463,6 @@ public class HttpInputTest Assert.assertThat(_history.poll(), Matchers.nullValue()); Assert.assertThat(_in.isReady(), Matchers.equalTo(false)); - Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0")); - Assert.assertThat(_history.poll(), Matchers.equalTo("s.onReadUnready")); Assert.assertThat(_history.poll(), Matchers.nullValue()); _in.failed(new TimeoutException()); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpManyWaysToCommitTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpManyWaysToCommitTest.java index 37963cecb95..aab9d880f37 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpManyWaysToCommitTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpManyWaysToCommitTest.java @@ -428,7 +428,6 @@ public class HttpManyWaysToCommitTest extends AbstractHttpTest server.start(); HttpTester.Response response = executeRequest(); - System.out.println(response.toString()); assertThat("response code", response.getStatus(), is(200)); assertHeader(response, "content-length", "6"); byte content[] = response.getContentBytes(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpOutputTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpOutputTest.java index b4ec9937e79..126d591c8de 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpOutputTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpOutputTest.java @@ -41,6 +41,7 @@ import org.eclipse.jetty.server.HttpOutput.Interceptor; import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.HotSwapHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.resource.Resource; @@ -49,10 +50,12 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; /** * */ +@RunWith(AdvancedRunner.class) public class HttpOutputTest { private Server _server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpVersionCustomizerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpVersionCustomizerTest.java index 546e255a243..36979bf1c01 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpVersionCustomizerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpVersionCustomizerTest.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpTester; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.TestTracker; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -37,7 +38,9 @@ import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class HttpVersionCustomizerTest { @Rule diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java index e1a21a2eaf7..d3f930c30d1 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java @@ -26,13 +26,16 @@ import java.nio.charset.StandardCharsets; import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.Utf8StringBuilder; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class HttpWriterTest { private HttpOutput _httpOut; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/InclusiveByteRangeTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/InclusiveByteRangeTest.java index c0ebde4a40d..ffde218de93 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/InclusiveByteRangeTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/InclusiveByteRangeTest.java @@ -25,8 +25,11 @@ import static org.junit.Assert.assertNull; import java.util.List; import java.util.Vector; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class InclusiveByteRangeTest { @SuppressWarnings("unchecked") diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java index 2fad97b53b1..7b368fc0b8d 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java @@ -18,12 +18,16 @@ package org.eclipse.jetty.server; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ThreadPool; import org.junit.After; import org.junit.Test; +import org.junit.runner.RunWith; -public class InsufficientThreadsDetectionTest { +@RunWith(AdvancedRunner.class) +public class InsufficientThreadsDetectionTest +{ private Server _server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/LocalAsyncContextTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/LocalAsyncContextTest.java index 97cd56cd630..a6122f38fa2 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/LocalAsyncContextTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/LocalAsyncContextTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.io.InputStream; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -34,17 +35,17 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.handler.HandlerWrapper; import org.eclipse.jetty.server.session.SessionHandler; -import org.eclipse.jetty.util.thread.Locker; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class LocalAsyncContextTest { - private final AtomicReference _completed0 = new AtomicReference<>(); - private final AtomicReference _completed1 = new AtomicReference<>(); protected Server _server; protected SuspendHandler _handler; protected Connector _connector; @@ -68,8 +69,6 @@ public class LocalAsyncContextTest public void reset() { - _completed0.set(null); - _completed1.set(null); } protected Connector initConnector() @@ -94,9 +93,6 @@ public class LocalAsyncContextTest _handler.setCompleteAfter(-1); response = process(null); check(response, "TIMEOUT"); - - spinAssertEquals(1, () -> _completed0.get() == null ? 0 : 1); - spinAssertEquals(1, () -> _completed1.get() == null ? 0 : 1); } @Test @@ -225,9 +221,6 @@ public class LocalAsyncContextTest _handler.setCompleteAfter2(-1); response = process(null); check(response, "STARTASYNC", "DISPATCHED", "startasync", "STARTASYNC2", "DISPATCHED"); - - spinAssertEquals(1, () -> _completed0.get() == null ? 0 : 1); - spinAssertEquals(0, () -> _completed1.get() == null ? 0 : 1); } protected void check(String response, String... content) @@ -337,7 +330,6 @@ public class LocalAsyncContextTest final AsyncContext asyncContext = baseRequest.startAsync(); response.getOutputStream().println("STARTASYNC"); asyncContext.addListener(__asyncListener); - asyncContext.addListener(__asyncListener1); if (_suspendFor > 0) asyncContext.setTimeout(_suspendFor); @@ -479,31 +471,11 @@ public class LocalAsyncContextTest @Override public void onComplete(AsyncEvent event) throws IOException { - Throwable complete = new Throwable(); - if (!_completed0.compareAndSet(null, complete)) - { - System.err.println("First onCompleted:"); - _completed0.get().printStackTrace(); - System.err.println("First onCompleted:"); - complete.printStackTrace(); - _completed0.set(null); - throw new IllegalStateException(); - } } @Override public void onError(AsyncEvent event) throws IOException { - Throwable complete = new Throwable(); - if (!_completed0.compareAndSet(null, complete)) - { - System.err.println("First onCompleted:"); - _completed0.get().printStackTrace(); - System.err.println("First onCompleted:"); - complete.printStackTrace(); - _completed0.set(null); - throw new IllegalStateException(); - } } @Override @@ -521,45 +493,6 @@ public class LocalAsyncContextTest } }; - private AsyncListener __asyncListener1 = new AsyncListener() - { - @Override - public void onComplete(AsyncEvent event) throws IOException - { - Throwable complete = new Throwable(); - if (!_completed1.compareAndSet(null, complete)) - { - _completed1.get().printStackTrace(); - complete.printStackTrace(); - _completed1.set(null); - throw new IllegalStateException(); - } - } - - @Override - public void onError(AsyncEvent event) throws IOException - { - Throwable complete = new Throwable(); - if (!_completed1.compareAndSet(null, complete)) - { - _completed1.get().printStackTrace(); - complete.printStackTrace(); - _completed1.set(null); - throw new IllegalStateException(); - } - } - - @Override - public void onStartAsync(AsyncEvent event) throws IOException - { - } - - @Override - public void onTimeout(AsyncEvent event) throws IOException - { - } - }; - static void spinAssertEquals(T expected, Supplier actualSupplier) { spinAssertEquals(expected, actualSupplier, 10, TimeUnit.SECONDS); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/LocalConnectorTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/LocalConnectorTest.java index dd209cb8e68..c3922ae1dda 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/LocalConnectorTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/LocalConnectorTest.java @@ -30,11 +30,14 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BufferUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class LocalConnectorTest { private Server _server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/LowResourcesMonitorTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/LowResourcesMonitorTest.java index c298b772962..085551b1dfd 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/LowResourcesMonitorTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/LowResourcesMonitorTest.java @@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.CountDownLatch; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.TimerScheduler; import org.junit.After; @@ -35,7 +36,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class LowResourcesMonitorTest { QueuedThreadPool _threadPool; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java index 457e129d854..c40dfa04c05 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java @@ -39,10 +39,13 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BufferUtil; import org.junit.After; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class NetworkTrafficListenerTest { private static final byte END_OF_CONTENT = '~'; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java new file mode 100644 index 00000000000..4c4514f91dc --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java @@ -0,0 +1,281 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.Exchanger; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; +import org.eclipse.jetty.util.BufferUtil; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(AdvancedRunner.class) +public class NotAcceptingTest +{ + @Test + public void testServerConnectorBlockingAccept() throws Exception + { + Server server = new Server(); + ServerConnector connector = new ServerConnector(server,1,1); + connector.setPort(0); + connector.setIdleTimeout(500); + connector.setAcceptQueueSize(10); + server.addConnector(connector); + TestHandler handler = new TestHandler(); + server.setHandler(handler); + + server.start(); + + try(Socket client0 = new Socket("localhost",connector.getLocalPort());) + { + HttpTester.Input in0 = HttpTester.from(client0.getInputStream()); + + client0.getOutputStream().write("GET /one HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); + String uri = handler.exchange.exchange("data"); + assertThat(uri,is("/one")); + HttpTester.Response response = HttpTester.parseResponse(in0); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("data")); + + connector.setAccepting(false); + + // 0th connection still working + client0.getOutputStream().write("GET /two HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); + uri = handler.exchange.exchange("more data"); + assertThat(uri,is("/two")); + response = HttpTester.parseResponse(in0); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("more data")); + + + try(Socket client1 = new Socket("localhost",connector.getLocalPort());) + { + // can't stop next connection being accepted + HttpTester.Input in1 = HttpTester.from(client1.getInputStream()); + client1.getOutputStream().write("GET /three HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); + uri = handler.exchange.exchange("new connection"); + assertThat(uri,is("/three")); + response = HttpTester.parseResponse(in1); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("new connection")); + + + try(Socket client2 = new Socket("localhost",connector.getLocalPort());) + { + + HttpTester.Input in2 = HttpTester.from(client2.getInputStream()); + client2.getOutputStream().write("GET /four HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); + + try + { + uri = handler.exchange.exchange("delayed connection",500,TimeUnit.MILLISECONDS); + Assert.fail(uri); + } + catch(TimeoutException e) + { + // Can we accept the original? + connector.setAccepting(true); + uri = handler.exchange.exchange("delayed connection"); + assertThat(uri,is("/four")); + response = HttpTester.parseResponse(in2); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("delayed connection")); + } + } + } + } + } + + + @Test + public void testLocalConnector() throws Exception + { + Server server = new Server(); + LocalConnector connector = new LocalConnector(server); + connector.setIdleTimeout(500); + server.addConnector(connector); + TestHandler handler = new TestHandler(); + server.setHandler(handler); + + server.start(); + + try(LocalEndPoint client0 = connector.connect()) + { + client0.addInputAndExecute(BufferUtil.toBuffer("GET /one HTTP/1.1\r\nHost:localhost\r\n\r\n")); + String uri = handler.exchange.exchange("data"); + assertThat(uri,is("/one")); + HttpTester.Response response = HttpTester.parseResponse(client0.getResponse()); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("data")); + + connector.setAccepting(false); + + // 0th connection still working + client0.addInputAndExecute(BufferUtil.toBuffer("GET /two HTTP/1.1\r\nHost:localhost\r\n\r\n")); + uri = handler.exchange.exchange("more data"); + assertThat(uri,is("/two")); + response = HttpTester.parseResponse(client0.getResponse()); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("more data")); + + + try(LocalEndPoint client1 = connector.connect()) + { + // can't stop next connection being accepted + client1.addInputAndExecute(BufferUtil.toBuffer("GET /three HTTP/1.1\r\nHost:localhost\r\n\r\n")); + uri = handler.exchange.exchange("new connection"); + assertThat(uri,is("/three")); + response = HttpTester.parseResponse(client1.getResponse()); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("new connection")); + + + try(LocalEndPoint client2 = connector.connect()) + { + client2.addInputAndExecute(BufferUtil.toBuffer("GET /four HTTP/1.1\r\nHost:localhost\r\n\r\n")); + + try + { + uri = handler.exchange.exchange("delayed connection",500,TimeUnit.MILLISECONDS); + Assert.fail(uri); + } + catch(TimeoutException e) + { + // Can we accept the original? + connector.setAccepting(true); + uri = handler.exchange.exchange("delayed connection"); + assertThat(uri,is("/four")); + response = HttpTester.parseResponse(client2.getResponse()); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("delayed connection")); + } + } + } + } + } + + @Test + public void testServerConnectorAsyncAccept() throws Exception + { + Server server = new Server(); + ServerConnector connector = new ServerConnector(server,0,1); + connector.setPort(0); + connector.setIdleTimeout(500); + connector.setAcceptQueueSize(10); + server.addConnector(connector); + TestHandler handler = new TestHandler(); + server.setHandler(handler); + + server.start(); + + try(Socket client0 = new Socket("localhost",connector.getLocalPort());) + { + HttpTester.Input in0 = HttpTester.from(client0.getInputStream()); + + client0.getOutputStream().write("GET /one HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); + String uri = handler.exchange.exchange("data"); + assertThat(uri,is("/one")); + HttpTester.Response response = HttpTester.parseResponse(in0); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("data")); + + connector.setAccepting(false); + + // 0th connection still working + client0.getOutputStream().write("GET /two HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); + uri = handler.exchange.exchange("more data"); + assertThat(uri,is("/two")); + response = HttpTester.parseResponse(in0); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("more data")); + + + try(Socket client1 = new Socket("localhost",connector.getLocalPort());) + { + HttpTester.Input in1 = HttpTester.from(client1.getInputStream()); + client1.getOutputStream().write("GET /three HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); + + try + { + uri = handler.exchange.exchange("delayed connection",500,TimeUnit.MILLISECONDS); + Assert.fail(uri); + } + catch(TimeoutException e) + { + // Can we accept the original? + connector.setAccepting(true); + uri = handler.exchange.exchange("delayed connection"); + assertThat(uri,is("/three")); + response = HttpTester.parseResponse(in1); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("delayed connection")); + } + } + } + } + + public static class TestHandler extends AbstractHandler + { + final Exchanger exchange = new Exchanger<>(); + transient int handled; + + public TestHandler() + { + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + String content = exchange.exchange(baseRequest.getRequestURI()); + baseRequest.setHandled(true); + handled++; + response.setContentType("text/html;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + response.getWriter().print(content); + } + catch (InterruptedException e) + { + throw new ServletException(e); + } + } + + public int getHandled() + { + return handled; + } + } + +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ProxyConnectionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ProxyConnectionTest.java index 06e854d460e..f7d123a570b 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ProxyConnectionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ProxyConnectionTest.java @@ -19,16 +19,19 @@ package org.eclipse.jetty.server; import org.eclipse.jetty.server.handler.ErrorHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.log.StacklessLogging; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; /** * */ +@RunWith(AdvancedRunner.class) public class ProxyConnectionTest { private Server _server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ProxyProtocolTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ProxyProtocolTest.java index ff63e9202e0..f0cafe330c8 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ProxyProtocolTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ProxyProtocolTest.java @@ -31,10 +31,13 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class ProxyProtocolTest { private Server server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/RequestTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/RequestTest.java index 7471db4f8bc..a17988a52f5 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/RequestTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/RequestTest.java @@ -65,6 +65,7 @@ import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ErrorHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.FS; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.util.IO; @@ -77,7 +78,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class RequestTest { private static final Logger LOG = Log.getLogger(RequestTest.class); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResourceCacheTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResourceCacheTest.java index 45e3cc98a39..c7a03ea899e 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResourceCacheTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResourceCacheTest.java @@ -31,12 +31,15 @@ import org.eclipse.jetty.http.CompressedContentFormat; import org.eclipse.jetty.http.HttpContent; import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.http.ResourceHttpContent; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.resource.ResourceCollection; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class ResourceCacheTest { @Test diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index bad490a1653..c34ceff9e36 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -76,6 +76,7 @@ import org.eclipse.jetty.server.session.NullSessionDataStore; import org.eclipse.jetty.server.session.Session; import org.eclipse.jetty.server.session.SessionData; import org.eclipse.jetty.server.session.SessionHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.TimerScheduler; @@ -84,7 +85,9 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class ResponseTest { diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorAsyncContextTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorAsyncContextTest.java index 6341451d299..31f5187cfe3 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorAsyncContextTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorAsyncContextTest.java @@ -21,8 +21,11 @@ package org.eclipse.jetty.server; import java.net.Socket; import java.nio.charset.StandardCharsets; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.IO; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class ServerConnectorAsyncContextTest extends LocalAsyncContextTest { @Override diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorCloseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorCloseTest.java index 5bb86b96a6c..b524b21305a 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorCloseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorCloseTest.java @@ -18,11 +18,14 @@ package org.eclipse.jetty.server; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.junit.After; import org.junit.Before; +import org.junit.runner.RunWith; /* ------------------------------------------------------------ */ +@RunWith(AdvancedRunner.class) public class ServerConnectorCloseTest extends ConnectorCloseTestBase { diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java index 10370b20d08..1529b6793d1 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java @@ -50,15 +50,19 @@ import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.StacklessLogging; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertTrue; + +@RunWith(AdvancedRunner.class) public class ServerConnectorTest { public static class ReuseInfoHandler extends AbstractHandler diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java index c2d935cb5f1..f50891498a6 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java @@ -34,13 +34,16 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.session.SessionHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.StacklessLogging; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest { @Before @@ -50,12 +53,19 @@ public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest connector.setIdleTimeout(MAX_IDLE_TIME); startServer(connector); } + + @Test(timeout=60000) + public void testStartStopStart() throws Exception + { + _server.stop(); + _server.start(); + } @Test(timeout=60000) public void testIdleTimeoutAfterSuspend() throws Exception { - SuspendHandler _handler = new SuspendHandler(); _server.stop(); + SuspendHandler _handler = new SuspendHandler(); SessionHandler session = new SessionHandler(); session.setHandler(_handler); _server.setHandler(session); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ShutdownMonitorTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ShutdownMonitorTest.java index baae4bbf7ff..8ec43befeeb 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ShutdownMonitorTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ShutdownMonitorTest.java @@ -26,14 +26,17 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.Socket; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.thread.ShutdownThread; import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +@RunWith(AdvancedRunner.class) public class ShutdownMonitorTest { @After @@ -46,7 +49,7 @@ public class ShutdownMonitorTest public void testStatus() throws Exception { ShutdownMonitor monitor = ShutdownMonitor.getInstance(); - monitor.setDebug(true); + // monitor.setDebug(true); monitor.setPort(0); monitor.setExitVm(false); monitor.start(); @@ -87,7 +90,7 @@ public class ShutdownMonitorTest private void testStartStop(boolean reusePort) throws Exception { ShutdownMonitor monitor = ShutdownMonitor.getInstance(); - monitor.setDebug(true); + // monitor.setDebug(true); monitor.setPort(0); monitor.setExitVm(false); monitor.start(); @@ -121,7 +124,7 @@ public class ShutdownMonitorTest public void testForceStopCommand() throws Exception { ShutdownMonitor monitor = ShutdownMonitor.getInstance(); - monitor.setDebug(true); + // monitor.setDebug(true); monitor.setPort(0); monitor.setExitVm(false); monitor.start(); @@ -152,7 +155,7 @@ public class ShutdownMonitorTest public void testOldStopCommandWithStopOnShutdownTrue() throws Exception { ShutdownMonitor monitor = ShutdownMonitor.getInstance(); - monitor.setDebug(true); + // monitor.setDebug(true); monitor.setPort(0); monitor.setExitVm(false); monitor.start(); @@ -184,7 +187,7 @@ public class ShutdownMonitorTest public void testOldStopCommandWithStopOnShutdownFalse() throws Exception { ShutdownMonitor monitor = ShutdownMonitor.getInstance(); - monitor.setDebug(true); + // monitor.setDebug(true); monitor.setPort(0); monitor.setExitVm(false); monitor.start(); @@ -213,7 +216,7 @@ public class ShutdownMonitorTest public void stop(String command, int port, String key, boolean check) throws Exception { - System.out.printf("Attempting to send " + command + " to localhost:%d (%b)%n", port, check); + // System.out.printf("Attempting to send " + command + " to localhost:%d (%b)%n", port, check); try (Socket s = new Socket(InetAddress.getByName("127.0.0.1"), port)) { // send stop command diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java index b69b83da3dc..a247bdbf729 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java @@ -36,10 +36,13 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(AdvancedRunner.class) public class SlowClientWithPipelinedRequestTest { private final AtomicInteger handles = new AtomicInteger(); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index 6e8854d3aba..58c38c07bbf 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -56,7 +56,6 @@ import org.eclipse.jetty.util.thread.ReservedThreadExecutor; * the task and immediately continue producing. When operating in this pattern, the * sub-strategy is called ProduceExecuteConsume (PEC). *

- * */ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable { @@ -66,7 +65,6 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat private final Locker _locker = new Locker(); private State _state = State.IDLE; - private final Runnable _runProduce = new RunProduce(); private final Producer _producer; private final Executor _executor; private final ReservedThreadExecutor _producers; @@ -87,6 +85,8 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat _executor = executor; _producers = producers; addBean(_producer); + if (LOG.isDebugEnabled()) + LOG.debug("{} created", this); } @Override @@ -112,7 +112,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat if (LOG.isDebugEnabled()) LOG.debug("{} dispatch {}", this, execute); if (execute) - _executor.execute(_runProduce); + _executor.execute(this); } @Override @@ -126,6 +126,8 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat @Override public void produce() { + if (LOG.isDebugEnabled()) + LOG.debug("{} produce", this); boolean reproduce = true; while(isRunning() && tryProduce(reproduce) && doProduce()) reproduce = false; @@ -295,13 +297,4 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat builder.append('/'); builder.append(_producers); } - - private class RunProduce implements Runnable - { - @Override - public void run() - { - produce(); - } - } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java index 74709373579..15470916179 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java @@ -19,7 +19,9 @@ package org.eclipse.jetty.http.client; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; +import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -32,9 +34,12 @@ import javax.servlet.ServletException; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; +import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpStatus; @@ -43,6 +48,7 @@ import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.StacklessLogging; import org.junit.Assert; import org.junit.Test; @@ -517,7 +523,6 @@ public class ServerTimeoutsTest extends AbstractTest throw x; } } - }); DeferredContentProvider contentProvider = new DeferredContentProvider(); @@ -678,6 +683,59 @@ public class ServerTimeoutsTest extends AbstractTest Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); } + @Test + public void testIdleTimeoutBeforeReadIsIgnored() throws Exception + { + long idleTimeout = 1000; + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + try + { + Thread.sleep(2 * idleTimeout); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + }); + setServerIdleTimeout(idleTimeout); + + byte[] data = new byte[1024]; + new Random().nextBytes(data); + byte[] data1 = new byte[data.length / 2]; + System.arraycopy(data, 0, data1, 0, data1.length); + byte[] data2 = new byte[data.length - data1.length]; + System.arraycopy(data, data1.length, data2, 0, data2.length); + DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(data1)); + CountDownLatch latch = new CountDownLatch(1); + client.newRequest(newURI()) + .path(servletPath) + .content(content) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isSucceeded()); + Assert.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + Assert.assertArrayEquals(data, getContent()); + latch.countDown(); + } + }); + + // Wait for the server application to block reading. + Thread.sleep(3 * idleTimeout); + content.offer(ByteBuffer.wrap(data2)); + content.close(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + private static class BlockingReadHandler extends AbstractHandler.ErrorDispatchHandler { private final CountDownLatch handlerLatch;