diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java index 3a8b02ca29b..75f7f5462af 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java @@ -216,8 +216,8 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl @Override public boolean content(ByteBuffer content) { - // TODO avoid creating the Content object with wrapper? - boolean handle = onContent(new HttpInput.Content(content)) || _delayedForContent; + HttpInput.Content c = _httpConnection.newContent(content); + boolean handle = onContent(c) || _delayedForContent; _delayedForContent=false; return handle; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index e0033ce27a8..874f094339e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -19,9 +19,11 @@ package org.eclipse.jetty.server; import java.io.IOException; +import java.lang.ref.Reference; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpGenerator; @@ -62,12 +64,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http private final HttpGenerator _generator; private final HttpChannelOverHttp _channel; private final HttpParser _parser; + private final AtomicInteger _contentBufferReferences=new AtomicInteger(); private volatile ByteBuffer _requestBuffer = null; private volatile ByteBuffer _chunk = null; private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback(); private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback(); private final SendCallback _sendCallback = new SendCallback(); - private final HttpInput.PoisonPillContent _recycleRequestBuffer = new RecycleBufferContent(); /** * Get the current connection that this thread is dispatched to. @@ -281,6 +283,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http /* ------------------------------------------------------------ */ private int fillRequestBuffer() { + if (_contentBufferReferences.get()>0) + { + LOG.warn("{} fill with unconsumed content!",this); + return 0; + } + if (BufferUtil.isEmpty(_requestBuffer)) { // Can we fill? @@ -329,30 +337,19 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http if (LOG.isDebugEnabled()) LOG.debug("{} parse {} {}",this,BufferUtil.toDetailString(_requestBuffer)); - boolean buffer_had_content=BufferUtil.hasContent(_requestBuffer); boolean handle = _parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer); if (LOG.isDebugEnabled()) LOG.debug("{} parsed {} {}",this,handle,_parser); // recycle buffer ? - if (buffer_had_content) - { - if (BufferUtil.isEmpty(_requestBuffer)) - { - if (_parser.inContentState()) - _input.addContent(_recycleRequestBuffer); - else - releaseRequestBuffer(); - } - } - else - { + if (_contentBufferReferences.get()==0) releaseRequestBuffer(); - } + return handle; } - + + /* ------------------------------------------------------------ */ @Override public void onCompleted() { @@ -369,7 +366,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _channel.recycle(); _parser.reset(); _generator.reset(); - releaseRequestBuffer(); + if (_contentBufferReferences.get()==0) + releaseRequestBuffer(); + else + { + LOG.warn("{} lingering content references?!?!",this); + _requestBuffer=null; // Not returned to pool! + _contentBufferReferences.set(0); + } return; } } @@ -504,17 +508,25 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _sendCallback.iterate(); } - private final class RecycleBufferContent extends HttpInput.PoisonPillContent + + HttpInput.Content newContent(ByteBuffer c) { - private RecycleBufferContent() + return new Content(c); + } + + private class Content extends HttpInput.Content + { + public Content(ByteBuffer content) { - super("RECYCLE"); + super(content); + _contentBufferReferences.incrementAndGet(); } @Override public void succeeded() { - releaseRequestBuffer(); + if (_contentBufferReferences.decrementAndGet()==0) + releaseRequestBuffer(); } @Override @@ -768,4 +780,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http { getEndPoint().fillInterested(_blockingReadCallback); } + + @Override + public String toString() + { + return super.toString()+" "+BufferUtil.toDetailString(_requestBuffer); + } } diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java index 0cd68cc0c85..2eaadcfda3a 100644 --- a/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java @@ -45,8 +45,10 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; +import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; @@ -149,13 +151,14 @@ public class HttpInputIntegrationTest /* ------------------------------------------------------------ */ /** * @param uri The URI to test, typically /ctx/test?mode=THE_MODE + * @param delayMs the delay in MS to use. * @param delayInFrame If null, send the request with no delays, if FALSE then send with delays between frames, if TRUE send with delays within frames * @param contentLength The content length header to send. * @param content The content to send, with each string to be converted to a chunk or a frame * @return The response received in HTTP/1 format * @throws Exception */ - String send(String uri,Boolean delayInFrame, int contentLength, List content) throws Exception; + String send(String uri,int delayMs, Boolean delayInFrame, int contentLength, List content) throws Exception; } @Parameterized.Parameters @@ -172,23 +175,8 @@ public class HttpInputIntegrationTest // + HTTP/2 // + SSL + HTTP/2 // + FASTCGI - for (String c : new String[]{"LOCAL","H1","H1S"}) + for (Class client : new Class[]{LocalClient.class,H1Client.class,H1SClient.class}) { - TestClient client; - switch(c) - { - case "LOCAL": - client=new LocalClient(); - break; - case "H1": - client=new H1Client(1); - break; - case "H1S": - client=new H1SClient(2); - break; - default: - throw new IllegalStateException(); - } // test async actions that are run: // + By a thread in a container callback @@ -217,13 +205,13 @@ public class HttpInputIntegrationTest // + known length + content + EOF // + known length + content + content + EOF - tests.add(new Object[]{client,mode,dispatch,delayWithinFrame,200,0,-1,new String[]{}}); - tests.add(new Object[]{client,mode,dispatch,delayWithinFrame,200,8,-1,new String[]{"content0"}}); - tests.add(new Object[]{client,mode,dispatch,delayWithinFrame,200,16,-1,new String[]{"content0","CONTENT1"}}); + tests.add(new Object[]{tests.size(),client,mode,dispatch,delayWithinFrame,200,0,-1,new String[]{}}); + tests.add(new Object[]{tests.size(),client,mode,dispatch,delayWithinFrame,200,8,-1,new String[]{"content0"}}); + tests.add(new Object[]{tests.size(),client,mode,dispatch,delayWithinFrame,200,16,-1,new String[]{"content0","CONTENT1"}}); - tests.add(new Object[]{client,mode,dispatch,delayWithinFrame,200,0,0,new String[]{}}); - tests.add(new Object[]{client,mode,dispatch,delayWithinFrame,200,8,8,new String[]{"content0"}}); - tests.add(new Object[]{client,mode,dispatch,delayWithinFrame,200,16,16,new String[]{"content0","CONTENT1"}}); + tests.add(new Object[]{tests.size(),client,mode,dispatch,delayWithinFrame,200,0,0,new String[]{}}); + tests.add(new Object[]{tests.size(),client,mode,dispatch,delayWithinFrame,200,8,8,new String[]{"content0"}}); + tests.add(new Object[]{tests.size(),client,mode,dispatch,delayWithinFrame,200,16,16,new String[]{"content0","CONTENT1"}}); } } @@ -233,7 +221,8 @@ public class HttpInputIntegrationTest } - final TestClient _client; + final int _id; + final Class _client; final Mode _mode; final Boolean _delay; final int _status; @@ -241,8 +230,9 @@ public class HttpInputIntegrationTest final int _length; final List _send; - public HttpInputIntegrationTest(TestClient client, Mode mode,boolean dispatch,Boolean delay,int status,int read,int length,String... send) + public HttpInputIntegrationTest(int id,Class client, Mode mode,boolean dispatch,Boolean delay,int status,int read,int length,String... send) { + _id=id; _client=client; _mode=mode; __config.setDelayDispatchUntilContent(dispatch); @@ -334,22 +324,83 @@ public class HttpInputIntegrationTest } @Test - public void test() throws Exception + public void testOne() throws Exception { - System.err.printf("TEST c=%s, m=%s, d=%b D=%s content-length:%d expect=%d read=%d content:%s%n",_client.getClass().getSimpleName(),_mode,__config.isDelayDispatchUntilContent(),_delay,_length,_status,_read,_send); + System.err.printf("[%d] TEST c=%s, m=%s, delayDispatch=%b delayInFrame=%s content-length:%d expect=%d read=%d content:%s%n",_id,_client.getSimpleName(),_mode,__config.isDelayDispatchUntilContent(),_delay,_length,_status,_read,_send); - String response = _client.send("/ctx/test?mode="+_mode,_delay,_length,_send); + TestClient client=_client.newInstance(); + String response = client.send("/ctx/test?mode="+_mode,50,_delay,_length,_send); + + int sum=0; + for (String s:_send) + for (char c : s.toCharArray()) + sum+=c; assertThat(response,startsWith("HTTP")); assertThat(response,Matchers.containsString(" "+_status+" ")); assertThat(response,Matchers.containsString("read="+_read)); + assertThat(response,Matchers.containsString("sum="+sum)); } + + @Test + public void testStress() throws Exception + { + System.err.printf("[%d] STRESS c=%s, m=%s, delayDispatch=%b delayInFrame=%s content-length:%d expect=%d read=%d content:%s%n",_id,_client.getSimpleName(),_mode,__config.isDelayDispatchUntilContent(),_delay,_length,_status,_read,_send); + int sum=0; + for (String s:_send) + for (char c : s.toCharArray()) + sum+=c; + final int summation=sum; + + + final int threads=10; + final int loops=10; + + final AtomicInteger count = new AtomicInteger(0); + Thread[] t = new Thread[threads]; + + Runnable run = new Runnable() + { + @Override + public void run() + { + try + { + TestClient client=_client.newInstance(); + for (int j=0;j content) throws Exception + public String send(String uri, int delayMs, Boolean delayInFrame,int contentLength, List content) throws Exception { - int port=((NetworkConnector)__server.getConnectors()[_connector]).getLocalPort(); + int port=_connector.getLocalPort(); try (Socket client = newSocket("localhost", port)) { client.setSoTimeout(5000); + client.setTcpNoDelay(true); + client.setSoLinger(true,1); OutputStream out = client.getOutputStream(); StringBuilder buffer = new StringBuilder(); @@ -555,7 +630,7 @@ public class HttpInputIntegrationTest buffer.append("Host: localhost:").append(port).append("\r\n"); buffer.append("Connection: close\r\n"); - flush(out,buffer,delayInFrame,true); + flush(out,buffer,delayMs,delayInFrame,true); boolean chunked=contentLength<0; if (chunked) @@ -567,26 +642,26 @@ public class HttpInputIntegrationTest buffer.append("Content-Type: text/plain\r\n"); buffer.append("\r\n"); - flush(out,buffer,delayInFrame,false); + flush(out,buffer,delayMs,delayInFrame,false); for (String c : content) { if (chunked) { buffer.append("\r\n").append(Integer.toHexString(c.length())).append("\r\n"); - flush(out,buffer,delayInFrame,true); + flush(out,buffer,delayMs,delayInFrame,true); } buffer.append(c.substring(0,1)); - flush(out,buffer,delayInFrame,true); + flush(out,buffer,delayMs,delayInFrame,true); buffer.append(c.substring(1)); - flush(out,buffer,delayInFrame,false); + flush(out,buffer,delayMs,delayInFrame,false); } if (chunked) { buffer.append("\r\n0"); - flush(out,buffer,delayInFrame,true); + flush(out,buffer,delayMs,delayInFrame,true); buffer.append("\r\n\r\n"); } @@ -597,12 +672,13 @@ public class HttpInputIntegrationTest } - private void flush(OutputStream out, StringBuilder buffer, Boolean delayInFrame, boolean inFrame) throws Exception + private void flush(OutputStream out, StringBuilder buffer, int delayMs, Boolean delayInFrame, boolean inFrame) throws Exception { // Flush now if we should delay if (delayInFrame!=null && delayInFrame.equals(inFrame)) { flush(out,buffer); + Thread.sleep(delayMs); } } @@ -612,7 +688,6 @@ public class HttpInputIntegrationTest buffer.setLength(0); out.write(flush.getBytes(StandardCharsets.ISO_8859_1)); out.flush(); - Thread.sleep(50); } public Socket newSocket(String host, int port) throws IOException @@ -623,9 +698,16 @@ public class HttpInputIntegrationTest public static class H1SClient extends H1Client { - public H1SClient(int connector) + public H1SClient() { - super(connector); + for (Connector c:__server.getConnectors()) + { + if (c instanceof NetworkConnector && c.getDefaultConnectionFactory().getProtocol().equals("SSL")) + { + _connector=(NetworkConnector)c; + break; + } + } } public Socket newSocket(String host, int port) throws IOException