Merge remote-tracking branch 'origin/jetty-9.4.x-5605-wakeup-blocked-threads' into jetty-10.0.x-5605-wakeup-blocked-threads
This commit is contained in:
commit
9611a252a5
|
@ -385,6 +385,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
|
|
||||||
private boolean upgrade()
|
private boolean upgrade()
|
||||||
{
|
{
|
||||||
|
// If we are fill interested, then a read is pending and we must abort
|
||||||
|
if (isFillInterested())
|
||||||
|
{
|
||||||
|
LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint());
|
||||||
|
abort(new IllegalStateException());
|
||||||
|
}
|
||||||
|
|
||||||
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
|
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
|
||||||
if (connection == null)
|
if (connection == null)
|
||||||
return false;
|
return false;
|
||||||
|
@ -416,6 +423,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
if (upgrade())
|
if (upgrade())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
// Drive to EOF, EarlyEOF or Error
|
||||||
|
boolean complete = _input.consumeAll();
|
||||||
|
|
||||||
// Finish consuming the request
|
// Finish consuming the request
|
||||||
// If we are still expecting
|
// If we are still expecting
|
||||||
if (_channel.isExpecting100Continue())
|
if (_channel.isExpecting100Continue())
|
||||||
|
@ -424,7 +434,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
||||||
_parser.close();
|
_parser.close();
|
||||||
}
|
}
|
||||||
// else abort if we can't consume all
|
// else abort if we can't consume all
|
||||||
else if (_generator.isPersistent() && !_input.consumeAll())
|
else if (_generator.isPersistent() && !complete)
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("unconsumed input {} {}", this, _parser);
|
LOG.debug("unconsumed input {} {}", this, _parser);
|
||||||
|
|
|
@ -135,6 +135,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
||||||
if (isFinished())
|
if (isFinished())
|
||||||
return !isError();
|
return !isError();
|
||||||
|
|
||||||
|
//TODO move to early EOF and notify blocking reader
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,8 @@ import java.nio.charset.Charset;
|
||||||
import java.nio.charset.CharsetEncoder;
|
import java.nio.charset.CharsetEncoder;
|
||||||
import java.nio.charset.CoderResult;
|
import java.nio.charset.CoderResult;
|
||||||
import java.nio.charset.CodingErrorAction;
|
import java.nio.charset.CodingErrorAction;
|
||||||
|
import java.util.ResourceBundle;
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import javax.servlet.RequestDispatcher;
|
import javax.servlet.RequestDispatcher;
|
||||||
import javax.servlet.ServletOutputStream;
|
import javax.servlet.ServletOutputStream;
|
||||||
|
@ -435,10 +437,22 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
||||||
case BLOCKED:
|
case BLOCKED:
|
||||||
case UNREADY:
|
case UNREADY:
|
||||||
case PENDING:
|
case PENDING:
|
||||||
|
LOG.warn("Pending write in complete {} {}", this, _channel);
|
||||||
// An operation is in progress, so we soft close now
|
// An operation is in progress, so we soft close now
|
||||||
_softClose = true;
|
_softClose = true;
|
||||||
// then trigger a close from onWriteComplete
|
// then trigger a close from onWriteComplete
|
||||||
_state = State.CLOSE;
|
_state = State.CLOSE;
|
||||||
|
|
||||||
|
// But if we are blocked or there is more content to come, we must abort
|
||||||
|
// Note that this allows a pending async write to complete only if it is the last write
|
||||||
|
if (_apiState == ApiState.BLOCKED || !_channel.getResponse().isContentComplete(_written))
|
||||||
|
{
|
||||||
|
CancellationException cancelled = new CancellationException();
|
||||||
|
_writeBlocker.fail(cancelled);
|
||||||
|
_channel.abort(cancelled);
|
||||||
|
_state = State.CLOSED;
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -1351,7 +1365,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
||||||
{
|
{
|
||||||
_state = State.OPEN;
|
_state = State.OPEN;
|
||||||
_apiState = ApiState.BLOCKING;
|
_apiState = ApiState.BLOCKING;
|
||||||
_softClose = false;
|
_softClose = true; // Stay closed until next request
|
||||||
_interceptor = _channel;
|
_interceptor = _channel;
|
||||||
HttpConfiguration config = _channel.getHttpConfiguration();
|
HttpConfiguration config = _channel.getHttpConfiguration();
|
||||||
_bufferSize = config.getOutputBufferSize();
|
_bufferSize = config.getOutputBufferSize();
|
||||||
|
|
|
@ -1680,6 +1680,11 @@ public class Request implements HttpServletRequest
|
||||||
*/
|
*/
|
||||||
public void setMetaData(MetaData.Request request)
|
public void setMetaData(MetaData.Request request)
|
||||||
{
|
{
|
||||||
|
if (_metaData == null && _input != null && _channel != null)
|
||||||
|
{
|
||||||
|
_input.recycle();
|
||||||
|
_channel.getResponse().getHttpOutput().reopen();
|
||||||
|
}
|
||||||
_metaData = request;
|
_metaData = request;
|
||||||
_method = request.getMethod();
|
_method = request.getMethod();
|
||||||
_httpFields = request.getFields();
|
_httpFields = request.getFields();
|
||||||
|
@ -1771,7 +1776,7 @@ public class Request implements HttpServletRequest
|
||||||
|
|
||||||
getHttpChannelState().recycle();
|
getHttpChannelState().recycle();
|
||||||
_requestAttributeListeners.clear();
|
_requestAttributeListeners.clear();
|
||||||
_input.recycle();
|
// Defer _input.recycle() until setMetaData on next request, TODO replace with recycle and reopen in 10
|
||||||
_metaData = null;
|
_metaData = null;
|
||||||
_httpFields = null;
|
_httpFields = null;
|
||||||
_trailers = null;
|
_trailers = null;
|
||||||
|
|
|
@ -0,0 +1,504 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.server;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import javax.servlet.AsyncContext;
|
||||||
|
import javax.servlet.DispatcherType;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.http.HttpTester;
|
||||||
|
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||||
|
import org.eclipse.jetty.server.handler.ContextHandler;
|
||||||
|
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||||
|
import org.eclipse.jetty.server.handler.HandlerList;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class BlockingTest
|
||||||
|
{
|
||||||
|
private Server server;
|
||||||
|
ServerConnector connector;
|
||||||
|
private ContextHandler context;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp()
|
||||||
|
{
|
||||||
|
server = new Server();
|
||||||
|
connector = new ServerConnector(server);
|
||||||
|
connector.setPort(0);
|
||||||
|
server.addConnector(connector);
|
||||||
|
|
||||||
|
context = new ContextHandler("/ctx");
|
||||||
|
|
||||||
|
HandlerList handlers = new HandlerList();
|
||||||
|
handlers.setHandlers(new Handler[]{context, new DefaultHandler()});
|
||||||
|
server.setHandler(handlers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
void tearDown() throws Exception
|
||||||
|
{
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockingReadThenNormalComplete() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch started = new CountDownLatch(1);
|
||||||
|
CountDownLatch stopped = new CountDownLatch(1);
|
||||||
|
AtomicReference<Throwable> readException = new AtomicReference<>();
|
||||||
|
AbstractHandler handler = new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
new Thread(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
int b = baseRequest.getHttpInput().read();
|
||||||
|
if (b == '1')
|
||||||
|
{
|
||||||
|
started.countDown();
|
||||||
|
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
readException.set(t);
|
||||||
|
stopped.countDown();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// wait for thread to start and read first byte
|
||||||
|
started.await(10, TimeUnit.SECONDS);
|
||||||
|
// give it time to block on second byte
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
response.setStatus(200);
|
||||||
|
response.setContentType("text/plain");
|
||||||
|
response.getOutputStream().print("OK\r\n");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
context.setHandler(handler);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
StringBuilder request = new StringBuilder();
|
||||||
|
request.append("POST /ctx/path/info HTTP/1.1\r\n")
|
||||||
|
.append("Host: localhost\r\n")
|
||||||
|
.append("Content-Type: test/data\r\n")
|
||||||
|
.append("Content-Length: 2\r\n")
|
||||||
|
.append("\r\n")
|
||||||
|
.append("1");
|
||||||
|
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
try (Socket socket = new Socket("localhost", port))
|
||||||
|
{
|
||||||
|
socket.setSoTimeout(1000000);
|
||||||
|
OutputStream out = socket.getOutputStream();
|
||||||
|
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
|
||||||
|
|
||||||
|
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
|
||||||
|
assertThat(response, notNullValue());
|
||||||
|
assertThat(response.getStatus(), is(200));
|
||||||
|
assertThat(response.getContent(), containsString("OK"));
|
||||||
|
|
||||||
|
// Async thread should have stopped
|
||||||
|
assertTrue(stopped.await(10, TimeUnit.SECONDS));
|
||||||
|
assertThat(readException.get(), instanceOf(IOException.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNormalCompleteThenBlockingRead() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch started = new CountDownLatch(1);
|
||||||
|
CountDownLatch completed = new CountDownLatch(1);
|
||||||
|
CountDownLatch stopped = new CountDownLatch(1);
|
||||||
|
AtomicReference<Throwable> readException = new AtomicReference<>();
|
||||||
|
AbstractHandler handler = new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
new Thread(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
int b = baseRequest.getHttpInput().read();
|
||||||
|
if (b == '1')
|
||||||
|
{
|
||||||
|
started.countDown();
|
||||||
|
completed.await(10, TimeUnit.SECONDS);
|
||||||
|
Thread.sleep(500);
|
||||||
|
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
readException.set(t);
|
||||||
|
stopped.countDown();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// wait for thread to start and read first byte
|
||||||
|
started.await(10, TimeUnit.SECONDS);
|
||||||
|
// give it time to block on second byte
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
response.setStatus(200);
|
||||||
|
response.setContentType("text/plain");
|
||||||
|
response.getOutputStream().print("OK\r\n");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
context.setHandler(handler);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
StringBuilder request = new StringBuilder();
|
||||||
|
request.append("POST /ctx/path/info HTTP/1.1\r\n")
|
||||||
|
.append("Host: localhost\r\n")
|
||||||
|
.append("Content-Type: test/data\r\n")
|
||||||
|
.append("Content-Length: 2\r\n")
|
||||||
|
.append("\r\n")
|
||||||
|
.append("1");
|
||||||
|
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
try (Socket socket = new Socket("localhost", port))
|
||||||
|
{
|
||||||
|
socket.setSoTimeout(1000000);
|
||||||
|
OutputStream out = socket.getOutputStream();
|
||||||
|
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
|
||||||
|
|
||||||
|
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
|
||||||
|
assertThat(response, notNullValue());
|
||||||
|
assertThat(response.getStatus(), is(200));
|
||||||
|
assertThat(response.getContent(), containsString("OK"));
|
||||||
|
|
||||||
|
completed.countDown();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Async thread should have stopped
|
||||||
|
assertTrue(stopped.await(10, TimeUnit.SECONDS));
|
||||||
|
assertThat(readException.get(), instanceOf(IOException.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStartAsyncThenBlockingReadThenTimeout() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch started = new CountDownLatch(1);
|
||||||
|
CountDownLatch completed = new CountDownLatch(1);
|
||||||
|
CountDownLatch stopped = new CountDownLatch(1);
|
||||||
|
AtomicReference<Throwable> readException = new AtomicReference<>();
|
||||||
|
AbstractHandler handler = new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
|
||||||
|
{
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
if (baseRequest.getDispatcherType() != DispatcherType.ERROR)
|
||||||
|
{
|
||||||
|
AsyncContext async = request.startAsync();
|
||||||
|
async.setTimeout(100);
|
||||||
|
|
||||||
|
new Thread(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
int b = baseRequest.getHttpInput().read();
|
||||||
|
if (b == '1')
|
||||||
|
{
|
||||||
|
started.countDown();
|
||||||
|
completed.await(10, TimeUnit.SECONDS);
|
||||||
|
Thread.sleep(500);
|
||||||
|
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
readException.set(t);
|
||||||
|
stopped.countDown();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// wait for thread to start and read first byte
|
||||||
|
started.await(10, TimeUnit.SECONDS);
|
||||||
|
// give it time to block on second byte
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
context.setHandler(handler);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
StringBuilder request = new StringBuilder();
|
||||||
|
request.append("POST /ctx/path/info HTTP/1.1\r\n")
|
||||||
|
.append("Host: localhost\r\n")
|
||||||
|
.append("Content-Type: test/data\r\n")
|
||||||
|
.append("Content-Length: 2\r\n")
|
||||||
|
.append("\r\n")
|
||||||
|
.append("1");
|
||||||
|
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
try (Socket socket = new Socket("localhost", port))
|
||||||
|
{
|
||||||
|
socket.setSoTimeout(1000000);
|
||||||
|
OutputStream out = socket.getOutputStream();
|
||||||
|
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
|
||||||
|
|
||||||
|
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
|
||||||
|
assertThat(response, notNullValue());
|
||||||
|
assertThat(response.getStatus(), is(500));
|
||||||
|
assertThat(response.getContent(), containsString("AsyncContext timeout"));
|
||||||
|
|
||||||
|
completed.countDown();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Async thread should have stopped
|
||||||
|
assertTrue(stopped.await(10, TimeUnit.SECONDS));
|
||||||
|
assertThat(readException.get(), instanceOf(IOException.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockingReadThenSendError() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch started = new CountDownLatch(1);
|
||||||
|
CountDownLatch stopped = new CountDownLatch(1);
|
||||||
|
AtomicReference<Throwable> readException = new AtomicReference<>();
|
||||||
|
AbstractHandler handler = new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
if (baseRequest.getDispatcherType() != DispatcherType.ERROR)
|
||||||
|
{
|
||||||
|
new Thread(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
int b = baseRequest.getHttpInput().read();
|
||||||
|
if (b == '1')
|
||||||
|
{
|
||||||
|
started.countDown();
|
||||||
|
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
readException.set(t);
|
||||||
|
stopped.countDown();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// wait for thread to start and read first byte
|
||||||
|
started.await(10, TimeUnit.SECONDS);
|
||||||
|
// give it time to block on second byte
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
response.sendError(499);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
context.setHandler(handler);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
StringBuilder request = new StringBuilder();
|
||||||
|
request.append("POST /ctx/path/info HTTP/1.1\r\n")
|
||||||
|
.append("Host: localhost\r\n")
|
||||||
|
.append("Content-Type: test/data\r\n")
|
||||||
|
.append("Content-Length: 2\r\n")
|
||||||
|
.append("\r\n")
|
||||||
|
.append("1");
|
||||||
|
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
try (Socket socket = new Socket("localhost", port))
|
||||||
|
{
|
||||||
|
socket.setSoTimeout(1000000);
|
||||||
|
OutputStream out = socket.getOutputStream();
|
||||||
|
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
|
||||||
|
|
||||||
|
HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream());
|
||||||
|
assertThat(response, notNullValue());
|
||||||
|
assertThat(response.getStatus(), is(499));
|
||||||
|
|
||||||
|
// Async thread should have stopped
|
||||||
|
assertTrue(stopped.await(10, TimeUnit.SECONDS));
|
||||||
|
assertThat(readException.get(), instanceOf(IOException.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockingWriteThenNormalComplete() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch started = new CountDownLatch(1);
|
||||||
|
CountDownLatch stopped = new CountDownLatch(1);
|
||||||
|
AtomicReference<Throwable> readException = new AtomicReference<>();
|
||||||
|
AbstractHandler handler = new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
|
||||||
|
{
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
response.setStatus(200);
|
||||||
|
response.setContentType("text/plain");
|
||||||
|
new Thread(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
byte[] data = new byte[16 * 1024];
|
||||||
|
Arrays.fill(data, (byte)'X');
|
||||||
|
data[data.length - 2] = '\r';
|
||||||
|
data[data.length - 1] = '\n';
|
||||||
|
OutputStream out = response.getOutputStream();
|
||||||
|
started.countDown();
|
||||||
|
while (true)
|
||||||
|
out.write(data);
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
readException.set(t);
|
||||||
|
stopped.countDown();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// wait for thread to start and read first byte
|
||||||
|
started.await(10, TimeUnit.SECONDS);
|
||||||
|
// give it time to block on write
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
context.setHandler(handler);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
StringBuilder request = new StringBuilder();
|
||||||
|
request.append("GET /ctx/path/info HTTP/1.1\r\n")
|
||||||
|
.append("Host: localhost\r\n")
|
||||||
|
.append("\r\n");
|
||||||
|
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
try (Socket socket = new Socket("localhost", port))
|
||||||
|
{
|
||||||
|
socket.setSoTimeout(1000000);
|
||||||
|
OutputStream out = socket.getOutputStream();
|
||||||
|
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
|
||||||
|
|
||||||
|
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1));
|
||||||
|
|
||||||
|
// Read the header
|
||||||
|
List<String> header = new ArrayList<>();
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
String line = in.readLine();
|
||||||
|
if (line.length() == 0)
|
||||||
|
break;
|
||||||
|
header.add(line);
|
||||||
|
}
|
||||||
|
assertThat(header.get(0), containsString("200 OK"));
|
||||||
|
|
||||||
|
// read one line of content
|
||||||
|
String content = in.readLine();
|
||||||
|
assertThat(content, is("4000"));
|
||||||
|
content = in.readLine();
|
||||||
|
assertThat(content, startsWith("XXXXXXXX"));
|
||||||
|
|
||||||
|
// check that writing thread is stopped by end of request handling
|
||||||
|
assertTrue(stopped.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// read until last line
|
||||||
|
String last = null;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
String line = in.readLine();
|
||||||
|
if (line == null)
|
||||||
|
break;
|
||||||
|
|
||||||
|
last = line;
|
||||||
|
}
|
||||||
|
|
||||||
|
// last line is not empty chunk, ie abnormal completion
|
||||||
|
assertThat(last, startsWith("XXXXX"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -643,6 +643,7 @@ public class ResponseTest
|
||||||
assertEquals("foo2/bar2;charset=utf-8", response.getContentType());
|
assertEquals("foo2/bar2;charset=utf-8", response.getContentType());
|
||||||
|
|
||||||
response.recycle();
|
response.recycle();
|
||||||
|
response.reopen();
|
||||||
|
|
||||||
response.setCharacterEncoding("utf16");
|
response.setCharacterEncoding("utf16");
|
||||||
response.setContentType("text/html; charset=utf-8");
|
response.setContentType("text/html; charset=utf-8");
|
||||||
|
@ -655,6 +656,7 @@ public class ResponseTest
|
||||||
assertEquals("text/xml;charset=utf-8", response.getContentType());
|
assertEquals("text/xml;charset=utf-8", response.getContentType());
|
||||||
|
|
||||||
response.recycle();
|
response.recycle();
|
||||||
|
response.reopen();
|
||||||
response.setCharacterEncoding("utf-16");
|
response.setCharacterEncoding("utf-16");
|
||||||
response.setContentType("foo/bar");
|
response.setContentType("foo/bar");
|
||||||
assertEquals("foo/bar;charset=utf-16", response.getContentType());
|
assertEquals("foo/bar;charset=utf-16", response.getContentType());
|
||||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.util;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
|
@ -44,10 +45,10 @@ public class SharedBlockingCallback
|
||||||
{
|
{
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SharedBlockingCallback.class);
|
private static final Logger LOG = LoggerFactory.getLogger(SharedBlockingCallback.class);
|
||||||
|
|
||||||
private static Throwable IDLE = new ConstantThrowable("IDLE");
|
private static final Throwable IDLE = new ConstantThrowable("IDLE");
|
||||||
private static Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED");
|
private static final Throwable SUCCEEDED = new ConstantThrowable("SUCCEEDED");
|
||||||
|
|
||||||
private static Throwable FAILED = new ConstantThrowable("FAILED");
|
private static final Throwable FAILED = new ConstantThrowable("FAILED");
|
||||||
|
|
||||||
private final ReentrantLock _lock = new ReentrantLock();
|
private final ReentrantLock _lock = new ReentrantLock();
|
||||||
private final Condition _idle = _lock.newCondition();
|
private final Condition _idle = _lock.newCondition();
|
||||||
|
@ -76,6 +77,26 @@ public class SharedBlockingCallback
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean fail(Throwable cause)
|
||||||
|
{
|
||||||
|
Objects.requireNonNull(cause);
|
||||||
|
_lock.lock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (_blocker._state == null)
|
||||||
|
{
|
||||||
|
_blocker._state = new BlockerFailedException(cause);
|
||||||
|
_complete.signalAll();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.unlock();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
protected void notComplete(Blocker blocker)
|
protected void notComplete(Blocker blocker)
|
||||||
{
|
{
|
||||||
LOG.warn("Blocker not complete {}", blocker);
|
LOG.warn("Blocker not complete {}", blocker);
|
||||||
|
@ -145,10 +166,12 @@ public class SharedBlockingCallback
|
||||||
_state = cause;
|
_state = cause;
|
||||||
_complete.signalAll();
|
_complete.signalAll();
|
||||||
}
|
}
|
||||||
else if (_state instanceof BlockerTimeoutException)
|
else if (_state instanceof BlockerTimeoutException || _state instanceof BlockerFailedException)
|
||||||
{
|
{
|
||||||
// Failure arrived late, block() already
|
// Failure arrived late, block() already
|
||||||
// modified the state, nothing more to do.
|
// modified the state, nothing more to do.
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Failed after {}", _state);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -261,4 +284,12 @@ public class SharedBlockingCallback
|
||||||
private static class BlockerTimeoutException extends TimeoutException
|
private static class BlockerTimeoutException extends TimeoutException
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class BlockerFailedException extends Exception
|
||||||
|
{
|
||||||
|
public BlockerFailedException(Throwable cause)
|
||||||
|
{
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.http.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.client.util.DeferredContentProvider;
|
||||||
|
import org.eclipse.jetty.server.Request;
|
||||||
|
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||||
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class BlockedIOTest extends AbstractTest<TransportScenario>
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void init(Transport transport) throws IOException
|
||||||
|
{
|
||||||
|
setScenario(new TransportScenario(transport));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ArgumentsSource(TransportProvider.class)
|
||||||
|
public void testBlockingReadThenNormalComplete(Transport transport) throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch started = new CountDownLatch(1);
|
||||||
|
CountDownLatch stopped = new CountDownLatch(1);
|
||||||
|
AtomicReference<Throwable> readException = new AtomicReference<>();
|
||||||
|
AtomicReference<Throwable> rereadException = new AtomicReference<>();
|
||||||
|
|
||||||
|
init(transport);
|
||||||
|
scenario.start(new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
new Thread(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
int b = baseRequest.getHttpInput().read();
|
||||||
|
if (b == '1')
|
||||||
|
{
|
||||||
|
started.countDown();
|
||||||
|
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable ex1)
|
||||||
|
{
|
||||||
|
readException.set(ex1);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}
|
||||||
|
catch (Throwable ex2)
|
||||||
|
{
|
||||||
|
rereadException.set(ex2);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
stopped.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// wait for thread to start and read first byte
|
||||||
|
started.await(10, TimeUnit.SECONDS);
|
||||||
|
// give it time to block on second byte
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
response.setStatus(200);
|
||||||
|
response.setContentType("text/plain");
|
||||||
|
response.getOutputStream().print("OK\r\n");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
DeferredContentProvider contentProvider = new DeferredContentProvider();
|
||||||
|
CountDownLatch ok = new CountDownLatch(2);
|
||||||
|
scenario.client.POST(scenario.newURI())
|
||||||
|
.content(contentProvider)
|
||||||
|
.onResponseContent((response, content) ->
|
||||||
|
{
|
||||||
|
assertThat(BufferUtil.toString(content), containsString("OK"));
|
||||||
|
ok.countDown();
|
||||||
|
})
|
||||||
|
.onResponseSuccess(response ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
assertThat(response.getStatus(), is(200));
|
||||||
|
stopped.await(10, TimeUnit.SECONDS);
|
||||||
|
ok.countDown();
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
t.printStackTrace();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.send(null);
|
||||||
|
contentProvider.offer(BufferUtil.toBuffer("1"));
|
||||||
|
|
||||||
|
assertTrue(ok.await(10, TimeUnit.SECONDS));
|
||||||
|
assertThat(readException.get(), instanceOf(IOException.class));
|
||||||
|
assertThat(rereadException.get(), instanceOf(IOException.class));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue