Removed duplicate handling from servlet. Now only handled in core in HttpChannelState and protected it from concurrent writes. Added additional tests. Signed-off-by: Simone Bordet <simone.bordet@gmail.com> Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
0ec9294f74
commit
b0d259118c
|
@ -217,6 +217,11 @@ public class MetaData implements Iterable<HttpField>
|
|||
return null;
|
||||
}
|
||||
|
||||
public boolean is100ContinueExpected()
|
||||
{
|
||||
return getHttpFields().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -20,9 +20,6 @@ import java.util.function.Supplier;
|
|||
|
||||
import org.eclipse.jetty.http.HttpException;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpGenerator;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
|
@ -65,7 +62,6 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
|
|||
private Content.Chunk _trailer;
|
||||
private boolean committed;
|
||||
private boolean _demand;
|
||||
private boolean _expects100Continue;
|
||||
|
||||
public HttpStreamOverHTTP2(HTTP2ServerConnection connection, HttpChannel httpChannel, HTTP2Stream stream)
|
||||
{
|
||||
|
@ -98,8 +94,6 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
|
|||
|
||||
HttpFields fields = _requestMetaData.getHttpFields();
|
||||
|
||||
_expects100Continue = fields.contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
|
||||
|
||||
if (_requestMetaData instanceof MetaData.ConnectRequest)
|
||||
tunnelSupport = new TunnelSupportOverHTTP2(_requestMetaData.getProtocol());
|
||||
|
||||
|
@ -185,12 +179,6 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
|
|||
// the two actions cancel each other, no need to further retain or release.
|
||||
chunk = createChunk(data);
|
||||
|
||||
// Some content is read, but the 100 Continue interim
|
||||
// response has not been sent yet, then don't bother
|
||||
// sending it later, as the client already sent the content.
|
||||
if (_expects100Continue && chunk.hasRemaining())
|
||||
_expects100Continue = false;
|
||||
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
_chunk = Content.Chunk.next(chunk);
|
||||
|
@ -218,11 +206,6 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
|
|||
}
|
||||
else if (demand)
|
||||
{
|
||||
if (_expects100Continue)
|
||||
{
|
||||
_expects100Continue = false;
|
||||
send(_requestMetaData, HttpGenerator.CONTINUE_100_INFO, false, null, Callback.NOOP);
|
||||
}
|
||||
_stream.demand();
|
||||
}
|
||||
}
|
||||
|
@ -312,9 +295,6 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
|
|||
return;
|
||||
}
|
||||
|
||||
if (_expects100Continue && response.getStatus() == HttpStatus.CONTINUE_100)
|
||||
_expects100Continue = false;
|
||||
|
||||
headersFrame = new HeadersFrame(streamId, response, null, false);
|
||||
}
|
||||
else
|
||||
|
|
|
@ -22,9 +22,6 @@ import java.util.function.Supplier;
|
|||
|
||||
import org.eclipse.jetty.http.HttpException;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpGenerator;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
|
@ -56,7 +53,6 @@ public class HttpStreamOverHTTP3 implements HttpStream
|
|||
private MetaData.Response responseMetaData;
|
||||
private Content.Chunk chunk;
|
||||
private boolean committed;
|
||||
private boolean expects100Continue;
|
||||
|
||||
public HttpStreamOverHTTP3(ServerHTTP3StreamConnection connection, HttpChannel httpChannel, HTTP3StreamServer stream)
|
||||
{
|
||||
|
@ -89,8 +85,6 @@ public class HttpStreamOverHTTP3 implements HttpStream
|
|||
|
||||
HttpFields fields = requestMetaData.getHttpFields();
|
||||
|
||||
expects100Continue = fields.contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("HTTP3 request #{}/{}, {} {} {}{}{}",
|
||||
|
@ -156,12 +150,6 @@ public class HttpStreamOverHTTP3 implements HttpStream
|
|||
// the two actions cancel each other, no need to further retain or release.
|
||||
chunk = createChunk(data);
|
||||
|
||||
// Some content is read, but the 100 Continue interim
|
||||
// response has not been sent yet, then don't bother
|
||||
// sending it later, as the client already sent the content.
|
||||
if (expects100Continue && chunk.hasRemaining())
|
||||
expects100Continue = false;
|
||||
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
this.chunk = chunk;
|
||||
|
@ -186,11 +174,6 @@ public class HttpStreamOverHTTP3 implements HttpStream
|
|||
}
|
||||
else
|
||||
{
|
||||
if (expects100Continue)
|
||||
{
|
||||
expects100Continue = false;
|
||||
send(requestMetaData, HttpGenerator.CONTINUE_100_INFO, false, null, Callback.NOOP);
|
||||
}
|
||||
stream.demand();
|
||||
}
|
||||
}
|
||||
|
@ -285,9 +268,6 @@ public class HttpStreamOverHTTP3 implements HttpStream
|
|||
return;
|
||||
}
|
||||
|
||||
if (expects100Continue && response.getStatus() == HttpStatus.CONTINUE_100)
|
||||
expects100Continue = false;
|
||||
|
||||
headersFrame = new HeadersFrame(response, false);
|
||||
}
|
||||
else
|
||||
|
|
|
@ -116,6 +116,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
private Consumer<Throwable> _onFailure;
|
||||
private Throwable _callbackFailure;
|
||||
private Attributes _cache;
|
||||
private boolean _expects100Continue;
|
||||
|
||||
public HttpChannelState(ConnectionMetaData connectionMetaData)
|
||||
{
|
||||
|
@ -153,6 +154,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_readFailure = null;
|
||||
_onFailure = null;
|
||||
_callbackFailure = null;
|
||||
_expects100Continue = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -241,6 +243,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
throw new IllegalStateException("duplicate request");
|
||||
_request = new ChannelRequest(this, request);
|
||||
_response = new ChannelResponse(_request);
|
||||
_expects100Continue = request.is100ContinueExpected();
|
||||
|
||||
HttpConfiguration httpConfiguration = getHttpConfiguration();
|
||||
HttpFields.Mutable responseHeaders = _response.getHeaders();
|
||||
|
@ -883,28 +886,43 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
boolean error;
|
||||
HttpStream stream;
|
||||
HttpChannelState httpChannelState;
|
||||
InterimCallback interimCallback = null;
|
||||
try (AutoLock ignored = _lock.lock())
|
||||
{
|
||||
httpChannelState = lockedGetHttpChannelState();
|
||||
stream = httpChannelState._stream;
|
||||
error = httpChannelState._readFailure != null;
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("demand {}", httpChannelState);
|
||||
|
||||
error = httpChannelState._readFailure != null;
|
||||
if (!error)
|
||||
{
|
||||
if (httpChannelState._onContentAvailable != null)
|
||||
throw new IllegalArgumentException("demand pending");
|
||||
httpChannelState._onContentAvailable = demandCallback;
|
||||
}
|
||||
|
||||
stream = httpChannelState._stream;
|
||||
if (httpChannelState._expects100Continue && httpChannelState._response._writeCallback == null)
|
||||
{
|
||||
httpChannelState._response._writeCallback = interimCallback = new InterimCallback(httpChannelState);
|
||||
httpChannelState._expects100Continue = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (error)
|
||||
{
|
||||
httpChannelState._serializedInvoker.run(demandCallback);
|
||||
else
|
||||
}
|
||||
else if (interimCallback == null)
|
||||
{
|
||||
stream.demand();
|
||||
}
|
||||
else
|
||||
{
|
||||
stream.send(_metaData, new MetaData.Response(HttpStatus.CONTINUE_100, null, getConnectionMetaData().getHttpVersion(), HttpFields.EMPTY), false, null, interimCallback);
|
||||
interimCallback.whenComplete((v, t) -> stream.demand());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1017,6 +1035,8 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
*/
|
||||
public static class ChannelResponse implements Response, Callback
|
||||
{
|
||||
private static final CompletableFuture<Void> UNEXPECTED_100_CONTINUE = CompletableFuture.failedFuture(new IllegalStateException("100 not expected"));
|
||||
private static final CompletableFuture<Void> COMMITTED_100_CONTINUE = CompletableFuture.failedFuture(new IllegalStateException("Committed"));
|
||||
private final ChannelRequest _request;
|
||||
private final ResponseHttpFields _httpFields;
|
||||
protected int _status;
|
||||
|
@ -1117,13 +1137,13 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
long length = BufferUtil.length(content);
|
||||
|
||||
HttpChannelState httpChannel;
|
||||
HttpChannelState httpChannelState;
|
||||
HttpStream stream;
|
||||
Throwable writeFailure;
|
||||
MetaData.Response responseMetaData = null;
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
{
|
||||
httpChannel = _request.lockedGetHttpChannelState();
|
||||
httpChannelState = _request.lockedGetHttpChannelState();
|
||||
long totalWritten = _contentBytesWritten + length;
|
||||
writeFailure = _writeFailure;
|
||||
|
||||
|
@ -1131,11 +1151,17 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
if (_writeCallback != null)
|
||||
{
|
||||
if (_writeCallback instanceof InterimCallback interimCallback)
|
||||
{
|
||||
// Do this write after the interim callback.
|
||||
interimCallback.whenComplete((v, t) -> write(last, content, callback));
|
||||
return;
|
||||
}
|
||||
writeFailure = new WritePendingException();
|
||||
}
|
||||
else
|
||||
{
|
||||
long committedContentLength = httpChannel._committedContentLength;
|
||||
long committedContentLength = httpChannelState._committedContentLength;
|
||||
long contentLength = committedContentLength >= 0 ? committedContentLength : getHeaders().getLongField(HttpHeader.CONTENT_LENGTH);
|
||||
|
||||
if (contentLength >= 0 && totalWritten != contentLength)
|
||||
|
@ -1159,27 +1185,27 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
|
||||
// If no failure by this point, we can try to switch to sending state.
|
||||
if (writeFailure == null)
|
||||
writeFailure = httpChannel.lockedStreamSend(last, length);
|
||||
writeFailure = httpChannelState.lockedStreamSend(last, length);
|
||||
|
||||
if (writeFailure == NOTHING_TO_SEND)
|
||||
{
|
||||
httpChannel._serializedInvoker.run(callback::succeeded);
|
||||
httpChannelState._serializedInvoker.run(callback::succeeded);
|
||||
return;
|
||||
}
|
||||
// Have we failed in some way?
|
||||
if (writeFailure != null)
|
||||
{
|
||||
Throwable failure = writeFailure;
|
||||
httpChannel._serializedInvoker.run(() -> callback.failed(failure));
|
||||
httpChannelState._serializedInvoker.run(() -> callback.failed(failure));
|
||||
return;
|
||||
}
|
||||
|
||||
// No failure, do the actual stream send using the ChannelResponse as the callback.
|
||||
_writeCallback = callback;
|
||||
_contentBytesWritten = totalWritten;
|
||||
stream = httpChannel._stream;
|
||||
stream = httpChannelState._stream;
|
||||
if (_httpFields.commit())
|
||||
responseMetaData = lockedPrepareResponse(httpChannel, last);
|
||||
responseMetaData = lockedPrepareResponse(httpChannelState, last);
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -1288,19 +1314,36 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
@Override
|
||||
public CompletableFuture<Void> writeInterim(int status, HttpFields headers)
|
||||
{
|
||||
Completable completable = new Completable();
|
||||
if (HttpStatus.isInterim(status))
|
||||
if (!HttpStatus.isInterim(status))
|
||||
return CompletableFuture.failedFuture(new IllegalArgumentException("Invalid interim status code: " + status));
|
||||
|
||||
HttpStream stream;
|
||||
MetaData.Response response;
|
||||
InterimCallback interimCallback;
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
{
|
||||
HttpChannelState channel = _request.getHttpChannelState();
|
||||
HttpVersion version = channel.getConnectionMetaData().getHttpVersion();
|
||||
MetaData.Response response = new MetaData.Response(status, null, version, headers);
|
||||
channel._stream.send(_request._metaData, response, false, null, completable);
|
||||
HttpChannelState httpChannelState = _request.lockedGetHttpChannelState();
|
||||
stream = httpChannelState._stream;
|
||||
|
||||
if (status == HttpStatus.CONTINUE_100)
|
||||
{
|
||||
if (!httpChannelState._expects100Continue)
|
||||
return UNEXPECTED_100_CONTINUE;
|
||||
httpChannelState._expects100Continue = false;
|
||||
}
|
||||
|
||||
if (_httpFields.isCommitted())
|
||||
return status == HttpStatus.CONTINUE_100 ? COMMITTED_100_CONTINUE : CompletableFuture.failedFuture(new IllegalStateException("Committed"));
|
||||
if (_writeCallback != null)
|
||||
return CompletableFuture.failedFuture(new WritePendingException());
|
||||
|
||||
_writeCallback = interimCallback = new InterimCallback(httpChannelState);
|
||||
HttpVersion version = httpChannelState.getConnectionMetaData().getHttpVersion();
|
||||
response = new MetaData.Response(status, null, version, headers);
|
||||
}
|
||||
else
|
||||
{
|
||||
completable.failed(new IllegalArgumentException("Invalid interim status code: " + status));
|
||||
}
|
||||
return completable;
|
||||
|
||||
stream.send(_request._metaData, response, false, null, interimCallback);
|
||||
return interimCallback;
|
||||
}
|
||||
|
||||
MetaData.Response lockedPrepareResponse(HttpChannelState httpChannel, boolean last)
|
||||
|
@ -1635,6 +1678,40 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
}
|
||||
|
||||
private static class InterimCallback extends Callback.Completable
|
||||
{
|
||||
private final HttpChannelState _httpChannelState;
|
||||
|
||||
private InterimCallback(HttpChannelState httpChannelState)
|
||||
{
|
||||
_httpChannelState = httpChannelState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
completing();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
completing();
|
||||
super.failed(x);
|
||||
}
|
||||
|
||||
private void completing()
|
||||
{
|
||||
try (AutoLock ignore = _httpChannelState._lock.lock())
|
||||
{
|
||||
// Allow other writes to proceed
|
||||
if (_httpChannelState._response._writeCallback == this)
|
||||
_httpChannelState._response._writeCallback = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class HttpChannelSerializedInvoker extends SerializedInvoker
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -1237,7 +1237,14 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
|
|||
_uri.path("/");
|
||||
}
|
||||
|
||||
_request = new MetaData.Request(_parser.getBeginNanoTime(), _method, _uri.asImmutable(), _version, _headerBuilder, _contentLength);
|
||||
_request = new MetaData.Request(_parser.getBeginNanoTime(), _method, _uri.asImmutable(), _version, _headerBuilder, _contentLength)
|
||||
{
|
||||
@Override
|
||||
public boolean is100ContinueExpected()
|
||||
{
|
||||
return _expects100Continue;
|
||||
}
|
||||
};
|
||||
|
||||
Runnable handle = _httpChannel.onRequest(_request);
|
||||
++_requests;
|
||||
|
@ -1373,12 +1380,6 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
|
|||
return;
|
||||
}
|
||||
|
||||
if (_expects100Continue)
|
||||
{
|
||||
_expects100Continue = false;
|
||||
send(_request, HttpGenerator.CONTINUE_100_INFO, false, null, Callback.NOOP);
|
||||
}
|
||||
|
||||
tryFillInterested(_demandContentCallback);
|
||||
}
|
||||
|
||||
|
|
|
@ -790,10 +790,8 @@ public class ServletApiRequest implements HttpServletRequest
|
|||
if (_inputState != ServletContextRequest.INPUT_NONE && _inputState != ServletContextRequest.INPUT_STREAM)
|
||||
throw new IllegalStateException("READER");
|
||||
_inputState = ServletContextRequest.INPUT_STREAM;
|
||||
|
||||
if (getServletRequestInfo().getServletChannel().isExpecting100Continue())
|
||||
getServletRequestInfo().getServletChannel().continue100(getServletRequestInfo().getHttpInput().available());
|
||||
|
||||
// Try to write a 100 continue, ignoring failure result if it was not necessary.
|
||||
_servletChannel.getResponse().writeInterim(HttpStatus.CONTINUE_100, HttpFields.EMPTY);
|
||||
return getServletRequestInfo().getHttpInput();
|
||||
}
|
||||
|
||||
|
@ -1074,7 +1072,12 @@ public class ServletApiRequest implements HttpServletRequest
|
|||
};
|
||||
}
|
||||
|
||||
if (_reader == null || !charset.equals(_readerCharset))
|
||||
if (_reader != null && charset.equals(_readerCharset))
|
||||
{
|
||||
// Try to write a 100 continue, ignoring failure result if it was not necessary.
|
||||
_servletChannel.getResponse().writeInterim(HttpStatus.CONTINUE_100, HttpFields.EMPTY);
|
||||
}
|
||||
else
|
||||
{
|
||||
ServletInputStream in = getInputStream();
|
||||
_readerCharset = charset;
|
||||
|
@ -1089,10 +1092,6 @@ public class ServletApiRequest implements HttpServletRequest
|
|||
}
|
||||
};
|
||||
}
|
||||
else if (getServletRequestInfo().getServletChannel().isExpecting100Continue())
|
||||
{
|
||||
getServletRequestInfo().getServletChannel().continue100(getServletRequestInfo().getHttpInput().available());
|
||||
}
|
||||
_inputState = ServletContextRequest.INPUT_READER;
|
||||
return _reader;
|
||||
}
|
||||
|
|
|
@ -23,8 +23,6 @@ import jakarta.servlet.RequestDispatcher;
|
|||
import org.eclipse.jetty.ee10.servlet.ServletChannelState.Action;
|
||||
import org.eclipse.jetty.http.BadMessageException;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpURI;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
|
@ -42,7 +40,6 @@ import org.eclipse.jetty.server.handler.ContextRequest;
|
|||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.ExceptionUtil;
|
||||
import org.eclipse.jetty.util.HostPort;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.URIUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -81,7 +78,6 @@ public class ServletChannel
|
|||
private Request _request;
|
||||
private Response _response;
|
||||
private Callback _callback;
|
||||
private boolean _expects100Continue;
|
||||
|
||||
public ServletChannel(ServletContextHandler servletContextHandler, Request request)
|
||||
{
|
||||
|
@ -121,7 +117,6 @@ public class ServletChannel
|
|||
_httpInput.reopen();
|
||||
_request = _servletContextRequest = servletContextRequest;
|
||||
_response = _servletContextRequest.getServletContextResponse();
|
||||
_expects100Continue = servletContextRequest.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("associate {} -> {} : {}",
|
||||
|
@ -379,35 +374,6 @@ public class ServletChannel
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the associated response has the Expect header set to 100 Continue,
|
||||
* then accessing the input stream indicates that the handler/servlet
|
||||
* is ready for the request body and thus a 100 Continue response is sent.
|
||||
*
|
||||
* @param available estimate of the number of bytes that are available
|
||||
* @throws IOException if the InputStream cannot be created
|
||||
*/
|
||||
public void continue100(int available) throws IOException
|
||||
{
|
||||
if (isExpecting100Continue())
|
||||
{
|
||||
_expects100Continue = false;
|
||||
if (available == 0)
|
||||
{
|
||||
if (isCommitted())
|
||||
throw new IOException("Committed before 100 Continue");
|
||||
try
|
||||
{
|
||||
getServletContextResponse().writeInterim(HttpStatus.CONTINUE_100, HttpFields.EMPTY).get();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
throw IO.rethrow(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare to be reused.
|
||||
* @param x Any completion exception, or null for successful completion.
|
||||
|
@ -422,7 +388,6 @@ public class ServletChannel
|
|||
_request = null;
|
||||
_response = null;
|
||||
_callback = null;
|
||||
_expects100Continue = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -725,11 +690,6 @@ public class ServletChannel
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isExpecting100Continue()
|
||||
{
|
||||
return _expects100Continue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.net.ServerSocket;
|
|||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -111,6 +112,74 @@ public class HttpClientContinueTest extends AbstractTest
|
|||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsNoFCGI")
|
||||
public void testExpect100ContinueWithMultipleContentsRespond100ContinueBlocking(Transport transport) throws Exception
|
||||
{
|
||||
byte[][] contents = new byte[][]{
|
||||
"data1".getBytes(StandardCharsets.UTF_8), "data2".getBytes(StandardCharsets.UTF_8), "data3".getBytes(StandardCharsets.UTF_8)
|
||||
};
|
||||
AtomicReference<Thread> readerThreadRef = new AtomicReference<>();
|
||||
start(transport, new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
readerThreadRef.set(Thread.currentThread());
|
||||
// Send 100-Continue and copy the content back
|
||||
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||
}
|
||||
});
|
||||
|
||||
ContentResponse response;
|
||||
try (AsyncRequestContent content = new AsyncRequestContent())
|
||||
{
|
||||
new Thread(() ->
|
||||
{
|
||||
for (byte[] b : contents)
|
||||
{
|
||||
try
|
||||
{
|
||||
// ensure that the reader will block/pause even after sending 100.
|
||||
await().atMost(5, TimeUnit.SECONDS).until(() ->
|
||||
{
|
||||
Thread thread = readerThreadRef.get();
|
||||
if (thread == null)
|
||||
return false;
|
||||
return thread.getState() == Thread.State.WAITING;
|
||||
});
|
||||
Callback.Completable callback = new Callback.Completable();
|
||||
content.write(b == contents[contents.length - 1], ByteBuffer.wrap(b), callback);
|
||||
callback.get();
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
t.printStackTrace();
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
response = client.newRequest(newURI(transport))
|
||||
.headers(headers -> headers.put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE))
|
||||
.body(content)
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
}
|
||||
|
||||
assertNotNull(response);
|
||||
assertEquals(200, response.getStatus());
|
||||
|
||||
int index = 0;
|
||||
byte[] responseContent = response.getContent();
|
||||
for (byte[] content : contents)
|
||||
{
|
||||
for (byte b : content)
|
||||
{
|
||||
assertEquals(b, responseContent[index++]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsNoFCGI")
|
||||
public void testExpect100ContinueWithChunkedContentRespond100Continue(Transport transport) throws Exception
|
||||
|
@ -145,6 +214,8 @@ public class HttpClientContinueTest extends AbstractTest
|
|||
|
||||
assertNotNull(response);
|
||||
assertEquals(200, response.getStatus());
|
||||
if (EnumSet.of(Transport.HTTP, Transport.HTTPS, Transport.UNIX_DOMAIN).contains(transport))
|
||||
assertTrue(response.getHeaders().contains(HttpHeader.TRANSFER_ENCODING, "chunked"));
|
||||
|
||||
int index = 0;
|
||||
byte[] responseContent = response.getContent();
|
||||
|
@ -792,7 +863,6 @@ public class HttpClientContinueTest extends AbstractTest
|
|||
try (ServerSocket server = new ServerSocket())
|
||||
{
|
||||
server.bind(new InetSocketAddress("localhost", 0));
|
||||
System.err.println("server listening on localhost:" + server.getLocalPort());
|
||||
|
||||
byte[] bytes = new byte[1024];
|
||||
new Random().nextBytes(bytes);
|
||||
|
@ -837,7 +907,6 @@ public class HttpClientContinueTest extends AbstractTest
|
|||
try (ServerSocket server = new ServerSocket())
|
||||
{
|
||||
server.bind(new InetSocketAddress("localhost", 0));
|
||||
System.err.println("server listening on localhost:" + server.getLocalPort());
|
||||
|
||||
// No Expect header, no content.
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
|
|
@ -98,7 +98,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
private ContextHandler.CoreContextRequest _coreRequest;
|
||||
private org.eclipse.jetty.server.Response _coreResponse;
|
||||
private Callback _coreCallback;
|
||||
private boolean _expects100Continue;
|
||||
|
||||
public HttpChannel(ContextHandler contextHandler, ConnectionMetaData connectionMetaData)
|
||||
{
|
||||
|
@ -432,35 +431,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the associated response has the Expect header set to 100 Continue,
|
||||
* then accessing the input stream indicates that the handler/servlet
|
||||
* is ready for the request body and thus a 100 Continue response is sent.
|
||||
*
|
||||
* @param available estimate of the number of bytes that are available
|
||||
* @throws IOException if the InputStream cannot be created
|
||||
*/
|
||||
public void send100Continue(int available) throws IOException
|
||||
{
|
||||
if (isExpecting100Continue())
|
||||
{
|
||||
_expects100Continue = false;
|
||||
if (available == 0)
|
||||
{
|
||||
if (isCommitted())
|
||||
throw new IOException("Committed before 100 Continue");
|
||||
try
|
||||
{
|
||||
_coreResponse.writeInterim(HttpStatus.CONTINUE_100, HttpFields.EMPTY).get();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
throw IO.rethrow(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void send102Processing(HttpFields headers) throws IOException
|
||||
{
|
||||
try
|
||||
|
@ -863,11 +833,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isExpecting100Continue()
|
||||
{
|
||||
return _expects100Continue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -890,7 +855,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
_coreRequest.addIdleTimeoutListener(_state::onIdleTimeout);
|
||||
_requests.incrementAndGet();
|
||||
_request.onRequest(coreRequest);
|
||||
_expects100Continue = coreRequest.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
|
||||
_combinedListener.onRequestBegin(_request);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
|
|
|
@ -825,10 +825,8 @@ public class Request implements HttpServletRequest
|
|||
if (_inputState != INPUT_NONE && _inputState != INPUT_STREAM)
|
||||
throw new IllegalStateException("READER");
|
||||
_inputState = INPUT_STREAM;
|
||||
|
||||
if (_channel.isExpecting100Continue())
|
||||
_channel.send100Continue(_input.available());
|
||||
|
||||
// Try to write a 100 continue, ignoring failure result if it was not necessary.
|
||||
_channel.getCoreResponse().writeInterim(HttpStatus.CONTINUE_100, HttpFields.EMPTY);
|
||||
return _input;
|
||||
}
|
||||
|
||||
|
@ -1050,7 +1048,12 @@ public class Request implements HttpServletRequest
|
|||
if (encoding == null)
|
||||
encoding = MimeTypes.ISO_8859_1;
|
||||
|
||||
if (_reader == null || !encoding.equalsIgnoreCase(_readerEncoding))
|
||||
if (_reader != null && encoding.equalsIgnoreCase(_readerEncoding))
|
||||
{
|
||||
// Try to write a 100 continue, ignoring failure result if it was not necessary.
|
||||
_channel.getCoreResponse().writeInterim(HttpStatus.CONTINUE_100, HttpFields.EMPTY);
|
||||
}
|
||||
else
|
||||
{
|
||||
ServletInputStream in = getInputStream();
|
||||
_readerEncoding = encoding;
|
||||
|
@ -1065,10 +1068,6 @@ public class Request implements HttpServletRequest
|
|||
}
|
||||
};
|
||||
}
|
||||
else if (_channel.isExpecting100Continue())
|
||||
{
|
||||
_channel.send100Continue(_input.available());
|
||||
}
|
||||
_inputState = INPUT_READER;
|
||||
return _reader;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue