Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.
This commit is contained in:
parent
ef84a797e9
commit
8245038a90
|
@ -125,6 +125,14 @@ public class HttpChannelOverFCGI extends HttpChannel
|
|||
dispatcher.dispatch();
|
||||
}
|
||||
|
||||
public boolean onIdleTimeout(Throwable timeout)
|
||||
{
|
||||
boolean handle = getRequest().getHttpInput().onIdleTimeout(timeout);
|
||||
if (handle)
|
||||
execute(this);
|
||||
return !handle;
|
||||
}
|
||||
|
||||
private static class Dispatcher implements Runnable
|
||||
{
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
|
||||
|
|
|
@ -105,6 +105,14 @@ public class ServerFCGIConnection extends AbstractConnection
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onReadTimeout(Throwable timeout)
|
||||
{
|
||||
return channels.values().stream()
|
||||
.mapToInt(channel -> channel.onIdleTimeout(timeout) ? 0 : 1)
|
||||
.sum() == 0;
|
||||
}
|
||||
|
||||
private void parse(ByteBuffer buffer)
|
||||
{
|
||||
while (buffer.hasRemaining())
|
||||
|
|
|
@ -315,7 +315,7 @@ public class HttpConfiguration implements Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* @param delay if true, delay the application dispatch until content is available (default false)
|
||||
* @param delay if true, delays the application dispatch until content is available (defaults to true)
|
||||
*/
|
||||
public void setDelayDispatchUntilContent(boolean delay)
|
||||
{
|
||||
|
|
|
@ -732,22 +732,29 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
|
||||
_listener = Objects.requireNonNull(readListener);
|
||||
|
||||
Content content = produceNextContext();
|
||||
if (content != null)
|
||||
if (isError())
|
||||
{
|
||||
_state = ASYNC;
|
||||
woken = _channelState.onReadReady();
|
||||
}
|
||||
else if (_state == EOF)
|
||||
{
|
||||
_state = AEOF;
|
||||
woken = _channelState.onReadEof();
|
||||
}
|
||||
else
|
||||
{
|
||||
_state = ASYNC;
|
||||
_channelState.onReadUnready();
|
||||
_waitingForContent = true;
|
||||
Content content = produceNextContext();
|
||||
if (content != null)
|
||||
{
|
||||
_state = ASYNC;
|
||||
woken = _channelState.onReadReady();
|
||||
}
|
||||
else if (_state == EOF)
|
||||
{
|
||||
_state = AEOF;
|
||||
woken = _channelState.onReadEof();
|
||||
}
|
||||
else
|
||||
{
|
||||
_state = ASYNC;
|
||||
_channelState.onReadUnready();
|
||||
_waitingForContent = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ReadListener;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
|
@ -56,6 +58,7 @@ import org.eclipse.jetty.client.util.DeferredContentProvider;
|
|||
import org.eclipse.jetty.client.util.InputStreamContentProvider;
|
||||
import org.eclipse.jetty.client.util.InputStreamResponseListener;
|
||||
import org.eclipse.jetty.client.util.OutputStreamContentProvider;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
|
@ -1264,4 +1267,77 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
|
|||
Result result = listener.await(5, TimeUnit.SECONDS);
|
||||
assertTrue(result.isSucceeded());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testClientDefersContentServerIdleTimeout(Transport transport) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
CountDownLatch errorLatch = new CountDownLatch(1);
|
||||
scenario.start(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
asyncContext.setTimeout(0);
|
||||
request.getInputStream().setReadListener(new ReadListener()
|
||||
{
|
||||
@Override
|
||||
public void onDataAvailable()
|
||||
{
|
||||
dataLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAllDataRead()
|
||||
{
|
||||
dataLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
errorLatch.countDown();
|
||||
response.setStatus(HttpStatus.REQUEST_TIMEOUT_408);
|
||||
asyncContext.complete();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
long idleTimeout = 1000;
|
||||
scenario.setServerIdleTimeout(idleTimeout);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
byte[] bytes = "[{\"key\":\"value\"}]".getBytes(StandardCharsets.UTF_8);
|
||||
OutputStreamContentProvider content = new OutputStreamContentProvider()
|
||||
{
|
||||
@Override
|
||||
public long getLength()
|
||||
{
|
||||
return bytes.length;
|
||||
}
|
||||
};
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.method(HttpMethod.POST)
|
||||
.path(scenario.servletPath)
|
||||
.content(content, "application/json;charset=UTF-8")
|
||||
.onResponseSuccess(response ->
|
||||
{
|
||||
assertEquals(HttpStatus.REQUEST_TIMEOUT_408, response.getStatus());
|
||||
latch.countDown();
|
||||
})
|
||||
.send(null);
|
||||
|
||||
// Wait for the server to idle timeout.
|
||||
Thread.sleep(2 * idleTimeout);
|
||||
|
||||
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Do not send the content to the server.
|
||||
|
||||
assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue