Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-10.0.x-WebSocketUpgradeFilter
This commit is contained in:
commit
dfdcf3e4cc
|
@ -48,18 +48,6 @@ pipeline {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
stage("Build Javadoc") {
|
||||
agent { node { label 'linux' } }
|
||||
steps {
|
||||
container( 'jetty-build' ) {
|
||||
timeout( time: 40, unit: 'MINUTES' ) {
|
||||
mavenBuild( "jdk11",
|
||||
"install javadoc:javadoc -DskipTests -Dpmd.skip=true -Dcheckstyle.skip=true", "maven3", false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,8 +123,11 @@ public class HttpChannelOverFCGI extends HttpChannel
|
|||
_contentQueue.clear();
|
||||
}
|
||||
copy.forEach(c -> c.failed(failure));
|
||||
HttpInput.Content lastContent = copy.isEmpty() ? null : copy.get(copy.size() - 1);
|
||||
boolean atEof = lastContent != null && lastContent.isEof();
|
||||
boolean atEof;
|
||||
try (AutoLock l = _lock.lock())
|
||||
{
|
||||
atEof = _specialContent != null && _specialContent.isEof();
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failed all content, EOF = {}", atEof);
|
||||
return atEof;
|
||||
|
|
|
@ -91,6 +91,7 @@ public interface ClientConnectionFactory
|
|||
* Tests whether one of the protocols of this class is also present in the given candidates list.
|
||||
*
|
||||
* @param candidates the candidates to match against
|
||||
* @param secure whether the protocol should be a secure one
|
||||
* @return whether one of the protocols of this class is present in the candidates
|
||||
*/
|
||||
public boolean matches(List<String> candidates, boolean secure)
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.eclipse.jetty.util.thread.Invocable;
|
|||
/**
|
||||
* <p>EndPoint is the abstraction for an I/O channel that transports bytes.</p>
|
||||
*
|
||||
* <h3>Asynchronous Methods</h3>
|
||||
* <p>Asynchronous Methods</p>
|
||||
* <p>The asynchronous scheduling methods of {@link EndPoint}
|
||||
* has been influenced by NIO.2 Futures and Completion
|
||||
* handlers, but does not use those actual interfaces because they have
|
||||
|
@ -42,7 +42,7 @@ import org.eclipse.jetty.util.thread.Invocable;
|
|||
* implementations of {@link Callback}, such as {@link FutureCallback} and
|
||||
* {@link IteratingCallback}.</p>
|
||||
*
|
||||
* <h3>Reads</h3>
|
||||
* <p>Reads</p>
|
||||
* <p>A {@link FutureCallback} can be used to block until an endpoint is ready
|
||||
* to fill bytes - the notification will be emitted by the NIO subsystem:</p>
|
||||
* <pre>
|
||||
|
@ -56,7 +56,7 @@ import org.eclipse.jetty.util.thread.Invocable;
|
|||
* int filled = endPoint.fill(byteBuffer);
|
||||
* </pre>
|
||||
*
|
||||
* <h3>Asynchronous Reads</h3>
|
||||
* <p>Asynchronous Reads</p>
|
||||
* <p>A {@link Callback} can be used to read asynchronously in its own dispatched
|
||||
* thread:</p>
|
||||
* <pre>
|
||||
|
@ -77,7 +77,7 @@ import org.eclipse.jetty.util.thread.Invocable;
|
|||
* });
|
||||
* </pre>
|
||||
*
|
||||
* <h3>Blocking Writes</h3>
|
||||
* <p>Blocking Writes</p>
|
||||
* <p>The write contract is that the callback is completed when all the bytes
|
||||
* have been written or there is a failure.
|
||||
* Blocking writes look like this:</p>
|
||||
|
|
|
@ -29,13 +29,10 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* JDBCLoginModule
|
||||
* <p>
|
||||
* JAAS LoginModule to retrieve user information from
|
||||
* a database and authenticate the user.
|
||||
* <h1>Notes</h1>
|
||||
* <p>This version uses plain old JDBC connections NOT
|
||||
* Datasources.
|
||||
* <p>JAAS LoginModule to retrieve user information from
|
||||
* a database and authenticate the user.</p>
|
||||
* <p>Notes</p>
|
||||
* <p>This version uses plain old JDBC connections NOT DataSources.</p>
|
||||
*/
|
||||
public class JDBCLoginModule extends AbstractDatabaseLoginModule
|
||||
{
|
||||
|
@ -51,7 +48,6 @@ public class JDBCLoginModule extends AbstractDatabaseLoginModule
|
|||
*
|
||||
* @return the connection for this datasource
|
||||
* @throws Exception if unable to get the connection
|
||||
* @see AbstractDatabaseLoginModule#getConnection()
|
||||
*/
|
||||
@Override
|
||||
public Connection getConnection()
|
||||
|
|
|
@ -130,8 +130,10 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failing all content with {}", (Object)failure);
|
||||
if (_content != null && !_content.isSpecial())
|
||||
if (_content != null)
|
||||
{
|
||||
if (_content.isSpecial())
|
||||
return _content.isEof();
|
||||
_content.failed(failure);
|
||||
_content = _content.isEof() ? EOF : null;
|
||||
if (_content == EOF)
|
||||
|
@ -146,15 +148,21 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
|
|||
LOG.debug("failed all content, EOF was not reached");
|
||||
return false;
|
||||
}
|
||||
c.skip(c.remaining());
|
||||
c.failed(failure);
|
||||
if (c.isSpecial())
|
||||
{
|
||||
_content = c;
|
||||
boolean atEof = c.isEof();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failed all content, EOF = {}", atEof);
|
||||
return atEof;
|
||||
}
|
||||
c.skip(c.remaining());
|
||||
c.failed(failure);
|
||||
if (c.isEof())
|
||||
{
|
||||
_content = EOF;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -42,11 +42,11 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
private final byte[] _oneByteBuffer = new byte[1];
|
||||
private final BlockingContentProducer _blockingContentProducer;
|
||||
private final AsyncContentProducer _asyncContentProducer;
|
||||
|
||||
private final HttpChannelState _channelState;
|
||||
private ContentProducer _contentProducer;
|
||||
private boolean _consumedEof;
|
||||
private ReadListener _readListener;
|
||||
private long _contentConsumed;
|
||||
|
||||
public HttpInput(HttpChannelState state)
|
||||
{
|
||||
|
@ -56,8 +56,6 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
_contentProducer = _blockingContentProducer;
|
||||
}
|
||||
|
||||
/* HttpInput */
|
||||
|
||||
public void recycle()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -66,6 +64,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
_contentProducer = _blockingContentProducer;
|
||||
_consumedEof = false;
|
||||
_readListener = null;
|
||||
_contentConsumed = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -112,6 +111,18 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
}
|
||||
}
|
||||
|
||||
public int get(Content content, byte[] bytes, int offset, int length)
|
||||
{
|
||||
int consumed = content.get(bytes, offset, length);
|
||||
_contentConsumed += consumed;
|
||||
return consumed;
|
||||
}
|
||||
|
||||
public long getContentConsumed()
|
||||
{
|
||||
return _contentConsumed;
|
||||
}
|
||||
|
||||
public long getContentReceived()
|
||||
{
|
||||
return _contentProducer.getRawContentArrived();
|
||||
|
@ -216,7 +227,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
throw new IllegalStateException("read on unready input");
|
||||
if (!content.isSpecial())
|
||||
{
|
||||
int read = content.get(b, off, len);
|
||||
int read = get(content, b, off, len);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("read produced {} byte(s)", read);
|
||||
if (content.isEmpty())
|
||||
|
|
5
pom.xml
5
pom.xml
|
@ -1165,7 +1165,7 @@
|
|||
<dependency>
|
||||
<groupId>io.grpc</groupId>
|
||||
<artifactId>grpc-core</artifactId>
|
||||
<version>1.33.0</version>
|
||||
<version>1.33.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.ant</groupId>
|
||||
|
@ -1367,6 +1367,9 @@
|
|||
<properties>
|
||||
<surefire.rerunFailingTestsCount>0</surefire.rerunFailingTestsCount>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>javadoc</module>
|
||||
</modules>
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
|
|
|
@ -20,7 +20,16 @@ package org.eclipse.jetty.test;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -28,35 +37,51 @@ import javax.servlet.http.HttpServletResponse;
|
|||
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.util.AsyncRequestContent;
|
||||
import org.eclipse.jetty.client.util.BytesRequestContent;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.server.HttpChannel;
|
||||
import org.eclipse.jetty.server.HttpConnection;
|
||||
import org.eclipse.jetty.server.HttpInput;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class GzipWithSendErrorTest
|
||||
{
|
||||
private Server server;
|
||||
private HttpClient client;
|
||||
private ServerConnector connector;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
|
@ -99,9 +124,8 @@ public class GzipWithSendErrorTest
|
|||
|
||||
response = client.newRequest(serverURI.resolve("/submit"))
|
||||
.method(HttpMethod.POST)
|
||||
.header(HttpHeader.CONTENT_ENCODING, "gzip")
|
||||
.header(HttpHeader.ACCEPT_ENCODING, "gzip")
|
||||
.content(new BytesContentProvider("text/plain", compressed("normal-A")))
|
||||
.headers(fields -> fields.put(HttpHeader.CONTENT_ENCODING, "gzip"))
|
||||
.body(new BytesRequestContent("text/plain", compressed("normal-A")))
|
||||
.send();
|
||||
|
||||
assertEquals(200, response.getStatus(), "Response status on /submit (normal-A)");
|
||||
|
@ -109,9 +133,8 @@ public class GzipWithSendErrorTest
|
|||
|
||||
response = client.newRequest(serverURI.resolve("/fail"))
|
||||
.method(HttpMethod.POST)
|
||||
.header(HttpHeader.CONTENT_ENCODING, "gzip")
|
||||
.header(HttpHeader.ACCEPT_ENCODING, "gzip")
|
||||
.content(new BytesContentProvider("text/plain", compressed("normal-B")))
|
||||
.headers(fields -> fields.put(HttpHeader.CONTENT_ENCODING, "gzip"))
|
||||
.body(new BytesRequestContent("text/plain", compressed("normal-B")))
|
||||
.send();
|
||||
|
||||
assertEquals(400, response.getStatus(), "Response status on /fail (normal-B)");
|
||||
|
@ -119,9 +142,8 @@ public class GzipWithSendErrorTest
|
|||
|
||||
response = client.newRequest(serverURI.resolve("/submit"))
|
||||
.method(HttpMethod.POST)
|
||||
.header(HttpHeader.CONTENT_ENCODING, "gzip")
|
||||
.header(HttpHeader.ACCEPT_ENCODING, "gzip")
|
||||
.content(new BytesContentProvider("text/plain", compressed("normal-C")))
|
||||
.headers(fields -> fields.put(HttpHeader.CONTENT_ENCODING, "gzip"))
|
||||
.body(new BytesRequestContent("text/plain", compressed("normal-C")))
|
||||
.send();
|
||||
|
||||
assertEquals(200, response.getStatus(), "Response status on /submit (normal-C)");
|
||||
|
@ -139,6 +161,210 @@ public class GzipWithSendErrorTest
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make request with compressed content.
|
||||
* <p>
|
||||
* Request contains (roughly) 1 MB of request network data.
|
||||
* Which unpacks to 1 GB of zeros.
|
||||
* </p>
|
||||
* <p>
|
||||
* This test is to ensure that consumeAll only reads the network data,
|
||||
* and doesn't process it through the interceptors.
|
||||
* </p>
|
||||
*/
|
||||
@Test
|
||||
public void testGzipConsumeAllContentLengthBlocking() throws Exception
|
||||
{
|
||||
URI serverURI = server.getURI();
|
||||
|
||||
CountDownLatch serverRequestCompleteLatch = new CountDownLatch(1);
|
||||
// count of bytes against network read
|
||||
AtomicLong inputBytesIn = new AtomicLong(0L);
|
||||
AtomicLong inputContentReceived = new AtomicLong(0L);
|
||||
// count of bytes against API read
|
||||
AtomicLong inputContentConsumed = new AtomicLong(0L);
|
||||
|
||||
connector.addBean(new HttpChannel.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Request request)
|
||||
{
|
||||
HttpConnection connection = (HttpConnection)request.getHttpChannel().getConnection();
|
||||
HttpInput httpInput = request.getHttpInput();
|
||||
inputContentConsumed.set(httpInput.getContentConsumed());
|
||||
inputContentReceived.set(httpInput.getContentReceived());
|
||||
inputBytesIn.set(connection.getBytesIn());
|
||||
serverRequestCompleteLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// This is a doubly-compressed (with gzip) test resource.
|
||||
// There's no point putting into SCM the full 1MB file, when the
|
||||
// 3KB version is adequate.
|
||||
Path zerosCompressed = MavenTestingUtils.getTestResourcePathFile("zeros.gz.gz");
|
||||
byte[] compressedRequest;
|
||||
try (InputStream in = Files.newInputStream(zerosCompressed);
|
||||
GZIPInputStream gzipIn = new GZIPInputStream(in);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream())
|
||||
{
|
||||
IO.copy(gzipIn, out);
|
||||
compressedRequest = out.toByteArray();
|
||||
}
|
||||
|
||||
int sizeActuallySent = compressedRequest.length / 2;
|
||||
ByteBuffer start = ByteBuffer.wrap(compressedRequest, 0, sizeActuallySent);
|
||||
AsyncRequestContent content = new AsyncRequestContent(start)
|
||||
{
|
||||
@Override
|
||||
public long getLength()
|
||||
{
|
||||
return compressedRequest.length;
|
||||
}
|
||||
};
|
||||
AtomicReference<Response> clientResponseRef = new AtomicReference<>();
|
||||
CountDownLatch clientResponseSuccessLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientResultComplete = new CountDownLatch(1);
|
||||
|
||||
client.newRequest(serverURI.resolve("/fail"))
|
||||
.method(HttpMethod.POST)
|
||||
.headers(fields -> fields.put(HttpHeader.CONTENT_TYPE, "application/octet-stream"))
|
||||
.headers(fields -> fields.put(HttpHeader.CONTENT_ENCODING, "gzip"))
|
||||
.body(content)
|
||||
.onResponseSuccess((response) ->
|
||||
{
|
||||
clientResponseRef.set(response);
|
||||
clientResponseSuccessLatch.countDown();
|
||||
})
|
||||
.send((result) -> clientResultComplete.countDown());
|
||||
|
||||
assertTrue(clientResponseSuccessLatch.await(5, TimeUnit.SECONDS), "Result not received");
|
||||
Response response = clientResponseRef.get();
|
||||
assertEquals(400, response.getStatus(), "Response status on /fail");
|
||||
|
||||
assertEquals("close", response.getHeaders().get(HttpHeader.CONNECTION), "Response Connection header");
|
||||
|
||||
// Await for server side to complete the request
|
||||
assertTrue(serverRequestCompleteLatch.await(5, TimeUnit.SECONDS), "Request complete never occurred?");
|
||||
|
||||
// System.out.printf("Input Content Consumed: %,d%n", inputContentConsumed.get());
|
||||
// System.out.printf("Input Content Received: %,d%n", inputContentReceived.get());
|
||||
// System.out.printf("Input BytesIn Count: %,d%n", inputBytesIn.get());
|
||||
|
||||
// Servlet didn't read body content.
|
||||
assertThat("Request Input Content Consumed none", inputContentConsumed.get(), is(0L));
|
||||
// Content arrived to the HttpChannel, but not to the HttpInput.
|
||||
assertThat("Request Input Content Received", inputContentReceived.get(), is(0L));
|
||||
assertThat("Request Input Content Received less then initial buffer", inputContentReceived.get(), lessThanOrEqualTo((long)sizeActuallySent));
|
||||
assertThat("Request Connection BytesIn should have some minimal data", inputBytesIn.get(), greaterThanOrEqualTo(1024L));
|
||||
assertThat("Request Connection BytesIn read should not have read all of the data", inputBytesIn.get(), lessThanOrEqualTo((long)sizeActuallySent));
|
||||
|
||||
// Now provide rest
|
||||
content.offer(ByteBuffer.wrap(compressedRequest, sizeActuallySent, compressedRequest.length - sizeActuallySent));
|
||||
content.close();
|
||||
|
||||
assertTrue(clientResultComplete.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Make request with compressed content.
|
||||
* <p>
|
||||
* Request contains (roughly) 1 MB of request network data.
|
||||
* Which unpacks to 1 GB of zeros.
|
||||
* </p>
|
||||
* <p>
|
||||
* This test is to ensure that consumeAll only reads the network data,
|
||||
* and doesn't process it through the interceptors.
|
||||
* </p>
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testGzipConsumeAllChunkedBlockingOnLastBuffer(boolean read) throws Exception
|
||||
{
|
||||
URI serverURI = server.getURI();
|
||||
|
||||
CountDownLatch serverRequestCompleteLatch = new CountDownLatch(1);
|
||||
// count of bytes against network read
|
||||
AtomicLong inputBytesIn = new AtomicLong(0L);
|
||||
AtomicLong inputContentReceived = new AtomicLong(0L);
|
||||
// count of bytes against API read
|
||||
AtomicLong inputContentConsumed = new AtomicLong(0L);
|
||||
|
||||
connector.addBean(new HttpChannel.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Request request)
|
||||
{
|
||||
HttpConnection connection = (HttpConnection)request.getHttpChannel().getConnection();
|
||||
HttpInput httpInput = request.getHttpInput();
|
||||
inputContentConsumed.set(httpInput.getContentConsumed());
|
||||
inputContentReceived.set(httpInput.getContentReceived());
|
||||
inputBytesIn.set(connection.getBytesIn());
|
||||
serverRequestCompleteLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// This is a doubly-compressed (with gzip) test resource.
|
||||
// There's no point putting into SCM the full 1MB file, when the
|
||||
// 3KB version is adequate.
|
||||
Path zerosCompressed = MavenTestingUtils.getTestResourcePathFile("zeros.gz.gz");
|
||||
byte[] compressedRequest;
|
||||
try (InputStream in = Files.newInputStream(zerosCompressed);
|
||||
GZIPInputStream gzipIn = new GZIPInputStream(in);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream())
|
||||
{
|
||||
IO.copy(gzipIn, out);
|
||||
compressedRequest = out.toByteArray();
|
||||
}
|
||||
|
||||
int sizeActuallySent = compressedRequest.length / 2;
|
||||
ByteBuffer start = ByteBuffer.wrap(compressedRequest, 0, sizeActuallySent);
|
||||
AsyncRequestContent content = new AsyncRequestContent(start);
|
||||
AtomicReference<Response> clientResponseRef = new AtomicReference<>();
|
||||
CountDownLatch clientResponseSuccessLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientResultComplete = new CountDownLatch(1);
|
||||
|
||||
URI uri = serverURI.resolve("/fail?read=" + read);
|
||||
|
||||
client.newRequest(uri)
|
||||
.method(HttpMethod.POST)
|
||||
.headers(fields -> fields.put(HttpHeader.CONTENT_TYPE, "application/octet-stream"))
|
||||
.headers(fields -> fields.put(HttpHeader.CONTENT_ENCODING, "gzip"))
|
||||
.body(content)
|
||||
.onResponseSuccess((response) ->
|
||||
{
|
||||
clientResponseRef.set(response);
|
||||
clientResponseSuccessLatch.countDown();
|
||||
})
|
||||
.send((result) -> clientResultComplete.countDown());
|
||||
|
||||
assertTrue(clientResponseSuccessLatch.await(5, TimeUnit.SECONDS), "Result not received");
|
||||
Response response = clientResponseRef.get();
|
||||
assertEquals(400, response.getStatus(), "Response status on /fail");
|
||||
|
||||
assertEquals("close", response.getHeaders().get(HttpHeader.CONNECTION), "Response Connection header");
|
||||
|
||||
// Await for server side to complete the request
|
||||
assertTrue(serverRequestCompleteLatch.await(5, TimeUnit.SECONDS), "Request complete never occurred?");
|
||||
|
||||
// System.out.printf("Input Content Consumed: %,d%n", inputContentConsumed.get());
|
||||
// System.out.printf("Input Content Received: %,d%n", inputContentReceived.get());
|
||||
// System.out.printf("Input BytesIn Count: %,d%n", inputBytesIn.get());
|
||||
|
||||
// Servlet read of body content.
|
||||
assertThat("Request Input Content Consumed " + (read ? "some" : "none"), inputContentConsumed.get(), is(read ? 1L : 0L));
|
||||
// Content arrived to the HttpChannel, but not to the HttpInput.
|
||||
assertThat("Request Input Content Received", inputContentReceived.get(), read ? greaterThan(0L) : is(0L));
|
||||
assertThat("Request Input Content Received less then initial buffer", inputContentReceived.get(), lessThanOrEqualTo((long)sizeActuallySent));
|
||||
assertThat("Request Connection BytesIn should have some minimal data", inputBytesIn.get(), greaterThanOrEqualTo(1024L));
|
||||
assertThat("Request Connection BytesIn read should not have read all of the data", inputBytesIn.get(), lessThanOrEqualTo((long)sizeActuallySent));
|
||||
|
||||
// Now provide rest
|
||||
content.offer(ByteBuffer.wrap(compressedRequest, sizeActuallySent, compressedRequest.length - sizeActuallySent));
|
||||
content.close();
|
||||
|
||||
assertTrue(clientResultComplete.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public static class PostServlet extends HttpServlet
|
||||
{
|
||||
@Override
|
||||
|
@ -146,8 +372,6 @@ public class GzipWithSendErrorTest
|
|||
{
|
||||
resp.setCharacterEncoding("utf-8");
|
||||
resp.setContentType("text/plain");
|
||||
resp.setHeader("X-Servlet", req.getServletPath());
|
||||
|
||||
String reqBody = IO.toString(req.getInputStream(), UTF_8);
|
||||
resp.getWriter().append(reqBody);
|
||||
}
|
||||
|
@ -158,7 +382,12 @@ public class GzipWithSendErrorTest
|
|||
@Override
|
||||
protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException
|
||||
{
|
||||
resp.setHeader("X-Servlet", req.getServletPath());
|
||||
boolean read = Boolean.parseBoolean(req.getParameter("read"));
|
||||
if (read)
|
||||
{
|
||||
int val = req.getInputStream().read();
|
||||
assertNotEquals(-1, val);
|
||||
}
|
||||
// intentionally do not read request body here.
|
||||
resp.sendError(400);
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.util.stream.Stream;
|
|||
import javax.net.ssl.SSLSocket;
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ReadListener;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletInputStream;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -61,8 +60,6 @@ import org.eclipse.jetty.util.IO;
|
|||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.condition.DisabledOnJre;
|
||||
import org.junit.jupiter.api.condition.JRE;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
@ -78,13 +75,8 @@ public class HttpInputIntegrationTest
|
|||
BLOCKING, ASYNC_DISPATCHED, ASYNC_OTHER_DISPATCHED, ASYNC_OTHER_WAIT
|
||||
}
|
||||
|
||||
public static final String EOF = "__EOF__";
|
||||
public static final String DELAY = "__DELAY__";
|
||||
public static final String ABORT = "__ABORT__";
|
||||
|
||||
private static Server __server;
|
||||
private static HttpConfiguration __config;
|
||||
private static HttpConfiguration __sslConfig;
|
||||
private static SslContextFactory.Server __sslContextFactory;
|
||||
|
||||
@BeforeAll
|
||||
|
@ -107,14 +99,15 @@ public class HttpInputIntegrationTest
|
|||
__sslContextFactory.setKeyStorePassword("storepwd");
|
||||
|
||||
// HTTPS Configuration
|
||||
__sslConfig = new HttpConfiguration(__config);
|
||||
__sslConfig.addCustomizer(new SecureRequestCustomizer());
|
||||
HttpConfiguration sslConfig = new HttpConfiguration(__config);
|
||||
sslConfig.addCustomizer(new SecureRequestCustomizer());
|
||||
|
||||
// HTTP/1 Connection Factory
|
||||
HttpConnectionFactory h1 = new HttpConnectionFactory(__sslConfig);
|
||||
HttpConnectionFactory h1 = new HttpConnectionFactory(sslConfig);
|
||||
|
||||
/* TODO
|
||||
// HTTP/2 Connection Factory
|
||||
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(__sslConfig);
|
||||
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(sslConfig);
|
||||
|
||||
NegotiatingServerConnectionFactory.checkProtocolNegotiationAvailable();
|
||||
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory();
|
||||
|
@ -145,7 +138,6 @@ public class HttpInputIntegrationTest
|
|||
|
||||
interface TestClient
|
||||
{
|
||||
|
||||
/**
|
||||
* @param uri The URI to test, typically /ctx/test?mode=THE_MODE
|
||||
* @param delayMs the delay in MS to use.
|
||||
|
@ -160,7 +152,6 @@ public class HttpInputIntegrationTest
|
|||
public static Stream<Arguments> scenarios()
|
||||
{
|
||||
List<Scenario> tests = new ArrayList<>();
|
||||
|
||||
// TODO other client types!
|
||||
// test with the following clients/protocols:
|
||||
// + Local
|
||||
|
@ -169,16 +160,14 @@ public class HttpInputIntegrationTest
|
|||
// + HTTP/2
|
||||
// + SSL + HTTP/2
|
||||
// + FASTCGI
|
||||
for (Class<? extends TestClient> client : new Class[]{LocalClient.class, H1Client.class, H1SClient.class})
|
||||
for (Class<? extends TestClient> client : Arrays.asList(LocalClient.class, H1Client.class, H1SClient.class))
|
||||
{
|
||||
|
||||
// test async actions that are run:
|
||||
// + By a thread in a container callback
|
||||
// + By another thread while a container callback is active
|
||||
// + By another thread while no container callback is active
|
||||
for (Mode mode : Mode.values())
|
||||
{
|
||||
|
||||
// test servlet dispatch with:
|
||||
// + Delayed dispatch on
|
||||
// + Delayed dispatch off
|
||||
|
@ -213,7 +202,7 @@ public class HttpInputIntegrationTest
|
|||
return tests.stream().map(Arguments::of);
|
||||
}
|
||||
|
||||
private static void runmode(Mode mode, final Request request, final Runnable test)
|
||||
private static void runMode(Mode mode, Request request, Runnable test)
|
||||
{
|
||||
switch (mode)
|
||||
{
|
||||
|
@ -224,22 +213,18 @@ public class HttpInputIntegrationTest
|
|||
}
|
||||
case ASYNC_OTHER_DISPATCHED:
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
new Thread()
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
new Thread(() ->
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
test.run();
|
||||
}
|
||||
finally
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
test.run();
|
||||
}
|
||||
}.start();
|
||||
finally
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
}).start();
|
||||
// prevent caller returning until other thread complete
|
||||
try
|
||||
{
|
||||
|
@ -254,40 +239,35 @@ public class HttpInputIntegrationTest
|
|||
}
|
||||
case ASYNC_OTHER_WAIT:
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final HttpChannelState.State S = request.getHttpChannelState().getState();
|
||||
new Thread()
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
HttpChannelState.State state = request.getHttpChannelState().getState();
|
||||
new Thread(() ->
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!latch.await(5, TimeUnit.SECONDS))
|
||||
fail("latch expired");
|
||||
if (!latch.await(5, TimeUnit.SECONDS))
|
||||
fail("latch expired");
|
||||
|
||||
// Spin until state change
|
||||
HttpChannelState.State s = request.getHttpChannelState().getState();
|
||||
while (request.getHttpChannelState().getState() == S)
|
||||
{
|
||||
Thread.yield();
|
||||
s = request.getHttpChannelState().getState();
|
||||
}
|
||||
test.run();
|
||||
}
|
||||
catch (Exception e)
|
||||
// Spin until state change
|
||||
while (request.getHttpChannelState().getState() == state)
|
||||
{
|
||||
e.printStackTrace();
|
||||
Thread.yield();
|
||||
}
|
||||
test.run();
|
||||
}
|
||||
}.start();
|
||||
catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}).start();
|
||||
// ensure other thread running before trying to return
|
||||
latch.countDown();
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -315,8 +295,6 @@ public class HttpInputIntegrationTest
|
|||
|
||||
@ParameterizedTest(name = "[{index}] STRESS {0}")
|
||||
@MethodSource("scenarios")
|
||||
// JDK 11's SSLSocket is not reliable enough to run this test.
|
||||
@DisabledOnJre(JRE.JAVA_11)
|
||||
public void testStress(Scenario scenario) throws Exception
|
||||
{
|
||||
int sum = 0;
|
||||
|
@ -327,37 +305,33 @@ public class HttpInputIntegrationTest
|
|||
sum += c;
|
||||
}
|
||||
}
|
||||
final int summation = sum;
|
||||
int summation = sum;
|
||||
|
||||
final int threads = 10;
|
||||
final int loops = 10;
|
||||
int threads = 10;
|
||||
int loops = 10;
|
||||
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
AtomicInteger count = new AtomicInteger(0);
|
||||
Thread[] t = new Thread[threads];
|
||||
|
||||
Runnable run = new Runnable()
|
||||
Runnable run = () ->
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
try
|
||||
{
|
||||
try
|
||||
TestClient client = scenario._client.getDeclaredConstructor().newInstance();
|
||||
for (int j = 0; j < loops; j++)
|
||||
{
|
||||
TestClient client = scenario._client.getDeclaredConstructor().newInstance();
|
||||
for (int j = 0; j < loops; j++)
|
||||
{
|
||||
String response = client.send("/ctx/test?mode=" + scenario._mode, 10, scenario._delay, scenario._length, scenario._send);
|
||||
assertTrue(response.startsWith("HTTP"));
|
||||
assertTrue(response.contains(" " + scenario._status + " "));
|
||||
assertTrue(response.contains("read=" + scenario._read));
|
||||
assertTrue(response.contains("sum=" + summation));
|
||||
count.incrementAndGet();
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
String response = client.send("/ctx/test?mode=" + scenario._mode, 10, scenario._delay, scenario._length, scenario._send);
|
||||
assertTrue(response.startsWith("HTTP"), response);
|
||||
assertTrue(response.contains(" " + scenario._status + " "), response);
|
||||
assertTrue(response.contains("read=" + scenario._read), response);
|
||||
assertTrue(response.contains("sum=" + summation), response);
|
||||
count.incrementAndGet();
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < threads; i++)
|
||||
|
@ -370,17 +344,17 @@ public class HttpInputIntegrationTest
|
|||
t[i].join();
|
||||
}
|
||||
|
||||
assertEquals(count.get(), threads * loops);
|
||||
assertEquals(threads * loops, count.get());
|
||||
}
|
||||
|
||||
public static class TestServlet extends HttpServlet
|
||||
{
|
||||
String expected = "content0CONTENT1";
|
||||
private final String expected = "content0CONTENT1";
|
||||
|
||||
@Override
|
||||
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) throws ServletException, IOException
|
||||
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException
|
||||
{
|
||||
final Mode mode = Mode.valueOf(req.getParameter("mode"));
|
||||
Mode mode = Mode.valueOf(req.getParameter("mode"));
|
||||
resp.setContentType("text/plain");
|
||||
|
||||
if (mode == Mode.BLOCKING)
|
||||
|
@ -409,81 +383,70 @@ public class HttpInputIntegrationTest
|
|||
else
|
||||
{
|
||||
// we are asynchronous
|
||||
final AsyncContext context = req.startAsync();
|
||||
AsyncContext context = req.startAsync();
|
||||
context.setTimeout(10000);
|
||||
final ServletInputStream in = req.getInputStream();
|
||||
final Request request = Request.getBaseRequest(req);
|
||||
final AtomicInteger read = new AtomicInteger(0);
|
||||
final AtomicInteger sum = new AtomicInteger(0);
|
||||
ServletInputStream in = req.getInputStream();
|
||||
Request request = Request.getBaseRequest(req);
|
||||
AtomicInteger read = new AtomicInteger(0);
|
||||
AtomicInteger sum = new AtomicInteger(0);
|
||||
|
||||
runmode(mode, request, new Runnable()
|
||||
runMode(mode, request, () -> in.setReadListener(new ReadListener()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
in.setReadListener(new ReadListener()
|
||||
t.printStackTrace();
|
||||
try
|
||||
{
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
resp.sendError(500);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
context.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDataAvailable()
|
||||
{
|
||||
runMode(mode, request, () ->
|
||||
{
|
||||
while (in.isReady() && !in.isFinished())
|
||||
{
|
||||
t.printStackTrace();
|
||||
try
|
||||
{
|
||||
resp.sendError(500);
|
||||
int b = in.read();
|
||||
if (b < 0)
|
||||
return;
|
||||
sum.addAndGet(b);
|
||||
int i = read.getAndIncrement();
|
||||
if (b != expected.charAt(i))
|
||||
{
|
||||
System.err.printf("XXX '%c'!='%c' at %d%n", expected.charAt(i), (char)b, i);
|
||||
System.err.println(" " + request.getHttpChannel());
|
||||
System.err.println(" " + request.getHttpChannel().getHttpTransport());
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException(e);
|
||||
onError(e);
|
||||
}
|
||||
context.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDataAvailable() throws IOException
|
||||
{
|
||||
runmode(mode, request, new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
while (in.isReady() && !in.isFinished())
|
||||
{
|
||||
try
|
||||
{
|
||||
int b = in.read();
|
||||
if (b < 0)
|
||||
return;
|
||||
sum.addAndGet(b);
|
||||
int i = read.getAndIncrement();
|
||||
if (b != expected.charAt(i))
|
||||
{
|
||||
System.err.printf("XXX '%c'!='%c' at %d%n", expected.charAt(i), (char)b, i);
|
||||
System.err.println(" " + request.getHttpChannel());
|
||||
System.err.println(" " + request.getHttpChannel().getHttpTransport());
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
onError(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAllDataRead() throws IOException
|
||||
{
|
||||
resp.setStatus(200);
|
||||
resp.setContentType("text/plain");
|
||||
resp.getWriter().println("read=" + read.get());
|
||||
resp.getWriter().println("sum=" + sum.get());
|
||||
context.complete();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@Override
|
||||
public void onAllDataRead() throws IOException
|
||||
{
|
||||
resp.setStatus(200);
|
||||
resp.setContentType("text/plain");
|
||||
resp.getWriter().println("read=" + read.get());
|
||||
resp.getWriter().println("sum=" + sum.get());
|
||||
context.complete();
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -526,7 +489,7 @@ public class HttpInputIntegrationTest
|
|||
flush(local, buffer, delayMs, delayInFrame, true);
|
||||
}
|
||||
|
||||
buffer.append(c.substring(0, 1));
|
||||
buffer.append(c.charAt(0));
|
||||
flush(local, buffer, delayMs, delayInFrame, true);
|
||||
buffer.append(c.substring(1));
|
||||
if (chunked)
|
||||
|
@ -556,9 +519,9 @@ public class HttpInputIntegrationTest
|
|||
}
|
||||
}
|
||||
|
||||
private void flush(final LocalEndPoint local, StringBuilder buffer) throws Exception
|
||||
private void flush(LocalEndPoint local, StringBuilder buffer)
|
||||
{
|
||||
final String flush = buffer.toString();
|
||||
String flush = buffer.toString();
|
||||
buffer.setLength(0);
|
||||
flushed.append(flush);
|
||||
local.addInputAndExecute(BufferUtil.toBuffer(flush));
|
||||
|
@ -619,7 +582,7 @@ public class HttpInputIntegrationTest
|
|||
flush(out, buffer, delayMs, delayInFrame, true);
|
||||
}
|
||||
|
||||
buffer.append(c.substring(0, 1));
|
||||
buffer.append(c.charAt(0));
|
||||
flush(out, buffer, delayMs, delayInFrame, true);
|
||||
buffer.append(c.substring(1));
|
||||
flush(out, buffer, delayMs, delayInFrame, false);
|
||||
|
@ -689,13 +652,13 @@ public class HttpInputIntegrationTest
|
|||
|
||||
public static class Scenario
|
||||
{
|
||||
final Class<? extends TestClient> _client;
|
||||
final Mode _mode;
|
||||
final Boolean _delay;
|
||||
final int _status;
|
||||
final int _read;
|
||||
final int _length;
|
||||
final List<String> _send;
|
||||
private final Class<? extends TestClient> _client;
|
||||
private final Mode _mode;
|
||||
private final Boolean _delay;
|
||||
private final int _status;
|
||||
private final int _read;
|
||||
private final int _length;
|
||||
private final List<String> _send;
|
||||
|
||||
public Scenario(Class<? extends TestClient> client, Mode mode, boolean dispatch, Boolean delay, int status, int read, int length, String... send)
|
||||
{
|
||||
|
|
Binary file not shown.
Loading…
Reference in New Issue