diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java index 5e8a75a9944..a15bf47ae51 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java @@ -85,7 +85,7 @@ public class HttpReceiverOverHTTPTest @Test public void test_Receive_NoResponseContent() throws Exception { - endPoint.setInput("" + + endPoint.addInput("" + "HTTP/1.1 200 OK\r\n" + "Content-length: 0\r\n" + "\r\n"); @@ -108,7 +108,7 @@ public class HttpReceiverOverHTTPTest public void test_Receive_ResponseContent() throws Exception { String content = "0123456789ABCDEF"; - endPoint.setInput("" + + endPoint.addInput("" + "HTTP/1.1 200 OK\r\n" + "Content-length: " + content.length() + "\r\n" + "\r\n" + @@ -135,7 +135,7 @@ public class HttpReceiverOverHTTPTest { String content1 = "0123456789"; String content2 = "ABCDEF"; - endPoint.setInput("" + + endPoint.addInput("" + "HTTP/1.1 200 OK\r\n" + "Content-length: " + (content1.length() + content2.length()) + "\r\n" + "\r\n" + @@ -143,7 +143,7 @@ public class HttpReceiverOverHTTPTest HttpExchange exchange = newExchange(); FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); connection.getHttpChannel().receive(); - endPoint.setInputEOF(); + endPoint.addInputEOF(); connection.getHttpChannel().receive(); try @@ -160,7 +160,7 @@ public class HttpReceiverOverHTTPTest @Test public void test_Receive_ResponseContent_IdleTimeout() throws Exception { - endPoint.setInput("" + + endPoint.addInput("" + "HTTP/1.1 200 OK\r\n" + "Content-length: 1\r\n" + "\r\n"); @@ -184,7 +184,7 @@ public class HttpReceiverOverHTTPTest @Test public void test_Receive_BadResponse() throws Exception { - endPoint.setInput("" + + endPoint.addInput("" + "HTTP/1.1 200 OK\r\n" + "Content-length: A\r\n" + "\r\n"); @@ -225,7 +225,7 @@ public class HttpReceiverOverHTTPTest // before fillInterested() is called. Assert.assertNull(getResponseBuffer()); // Fill the endpoint so receive is called again. - endPoint.setInput("X"); + endPoint.addInput("X"); super.fillInterested(); } }; @@ -235,7 +235,7 @@ public class HttpReceiverOverHTTPTest }; // Partial response to trigger the call to fillInterested(). - endPoint.setInput("" + + endPoint.addInput("" + "HTTP/1.1 200 OK\r\n" + "Content-Length: 1\r\n" + "\r\n"); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index 5b38a09f777..402c49545d4 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -18,18 +18,21 @@ package org.eclipse.jetty.io; +import java.io.EOFException; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; +import java.util.Queue; +import org.eclipse.jetty.util.ArrayQueue; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.SpinLock; /* ------------------------------------------------------------ */ @@ -40,6 +43,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint { static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class); public final static InetSocketAddress NOIP=new InetSocketAddress(0); + private static final ByteBuffer EOF = BufferUtil.allocate(0); private final Runnable _runFillable = new Runnable() { @@ -50,13 +54,13 @@ public class ByteArrayEndPoint extends AbstractEndPoint } }; - protected volatile ByteBuffer _in; - protected volatile ByteBuffer _out; - protected volatile boolean _ishut; - protected volatile boolean _oshut; - protected volatile boolean _closed; - protected volatile boolean _growOutput; - + private final SpinLock _lock = new SpinLock(); + private final Queue _inQ = new ArrayQueue<>(); + private ByteBuffer _out; + private boolean _ishut; + private boolean _oshut; + private boolean _closed; + private boolean _growOutput; /* ------------------------------------------------------------ */ /** @@ -107,7 +111,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output) { super(timer,NOIP,NOIP); - _in=input==null?BufferUtil.EMPTY_BUFFER:input; + if (BufferUtil.hasContent(input)) + addInput(input); _out=output==null?BufferUtil.allocate(1024):output; setIdleTimeout(idleTimeoutMs); } @@ -129,50 +134,62 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override protected void needsFillInterest() throws IOException { - if (_closed) - throw new ClosedChannelException(); - if (BufferUtil.hasContent(_in) || _in==null) - execute(_runFillable); - } - - /* ------------------------------------------------------------ */ - /** - * @return Returns the in. - */ - public ByteBuffer getIn() - { - return _in; + try(SpinLock.Lock lock = _lock.lock()) + { + if (_closed) + throw new ClosedChannelException(); + + ByteBuffer in = _inQ.peek(); + if (BufferUtil.hasContent(in) || in==EOF) + execute(_runFillable); + } } /* ------------------------------------------------------------ */ /** */ - public void setInputEOF() + public void addInputEOF() { - _in = null; + addInput((ByteBuffer)null); } /* ------------------------------------------------------------ */ /** * @param in The in to set. */ - public void setInput(ByteBuffer in) + public void addInput(ByteBuffer in) { - _in = in; - if (in == null || BufferUtil.hasContent(in)) - getFillInterest().fillable(); + boolean fillable=false; + try(SpinLock.Lock lock = _lock.lock()) + { + if (_inQ.peek()==EOF) + throw new RuntimeIOException(new EOFException()); + boolean was_empty=_inQ.isEmpty(); + if (in==null) + { + _inQ.add(EOF); + fillable=true; + } + if (BufferUtil.hasContent(in)) + { + _inQ.add(in); + fillable=was_empty; + } + } + if (fillable) + _runFillable.run(); } /* ------------------------------------------------------------ */ - public void setInput(String s) + public void addInput(String s) { - setInput(BufferUtil.toBuffer(s,StandardCharsets.UTF_8)); + addInput(BufferUtil.toBuffer(s,StandardCharsets.UTF_8)); } /* ------------------------------------------------------------ */ - public void setInput(String s,Charset charset) + public void addInput(String s,Charset charset) { - setInput(BufferUtil.toBuffer(s,charset)); + addInput(BufferUtil.toBuffer(s,charset)); } /* ------------------------------------------------------------ */ @@ -250,7 +267,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public boolean isOpen() { - return !_closed; + try(SpinLock.Lock lock = _lock.lock()) + { + return !_closed; + } } /* ------------------------------------------------------------ */ @@ -259,7 +279,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public boolean isInputShutdown() { - return _ishut||_closed; + try(SpinLock.Lock lock = _lock.lock()) + { + return _ishut||_closed; + } } /* ------------------------------------------------------------ */ @@ -268,15 +291,24 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public boolean isOutputShutdown() { - return _oshut||_closed; + try(SpinLock.Lock lock = _lock.lock()) + { + return _oshut||_closed; + } } /* ------------------------------------------------------------ */ - private void shutdownInput() + public void shutdownInput() { - _ishut=true; - if (_oshut) - close(); + boolean close=false; + try(SpinLock.Lock lock = _lock.lock()) + { + _ishut=true; + if (_oshut && !_closed) + close=_closed=true; + } + if (close) + super.close(); } /* ------------------------------------------------------------ */ @@ -286,9 +318,15 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public void shutdownOutput() { - _oshut=true; - if (_ishut) - close(); + boolean close=false; + try(SpinLock.Lock lock = _lock.lock()) + { + _oshut=true; + if (_ishut && !_closed) + close=_closed=true; + } + if (close) + super.close(); } /* ------------------------------------------------------------ */ @@ -298,8 +336,14 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public void close() { - super.close(); - _closed=true; + boolean close=false; + try(SpinLock.Lock lock = _lock.lock()) + { + if (!_closed) + close=_closed=_ishut=_oshut=true; + } + if (close) + super.close(); } /* ------------------------------------------------------------ */ @@ -318,13 +362,44 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public int fill(ByteBuffer buffer) throws IOException { - if (_closed) - throw new EofException("CLOSED"); - if (_in==null) - shutdownInput(); - if (_ishut) - return -1; - int filled=BufferUtil.append(buffer,_in); + int filled=0; + boolean close=false; + try(SpinLock.Lock lock = _lock.lock()) + { + while(true) + { + if (_closed) + throw new EofException("CLOSED"); + + if (_ishut) + return -1; + + if (_inQ.isEmpty()) + break; + + ByteBuffer in= _inQ.peek(); + if (in==EOF) + { + _ishut=true; + if (_oshut) + close=_closed=true; + filled=-1; + break; + } + + if (BufferUtil.hasContent(in)) + { + filled=BufferUtil.append(buffer,in); + if (BufferUtil.isEmpty(in)) + _inQ.poll(); + break; + } + _inQ.poll(); + } + } + + if (close) + super.close(); if (filled>0) notIdle(); return filled; @@ -386,7 +461,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint _ishut=false; _oshut=false; _closed=false; - _in=null; + _inQ.clear(); BufferUtil.clear(_out); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java index ca1f17dadd6..9cf42f622c3 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java @@ -64,7 +64,7 @@ public class ByteArrayEndPointTest public void testFill() throws Exception { ByteArrayEndPoint endp = new ByteArrayEndPoint(); - endp.setInput("test input"); + endp.addInput("test input"); ByteBuffer buffer = BufferUtil.allocate(1024); @@ -73,13 +73,13 @@ public class ByteArrayEndPointTest assertEquals(0,endp.fill(buffer)); - endp.setInput(" more"); + endp.addInput(" more"); assertEquals(5,endp.fill(buffer)); assertEquals("test input more",BufferUtil.toString(buffer)); assertEquals(0,endp.fill(buffer)); - endp.setInput((ByteBuffer)null); + endp.addInput((ByteBuffer)null); assertEquals(-1,endp.fill(buffer)); @@ -96,7 +96,7 @@ public class ByteArrayEndPointTest } endp.reset(); - endp.setInput("and more"); + endp.addInput("and more"); buffer = BufferUtil.allocate(4); assertEquals(4,endp.fill(buffer)); @@ -154,7 +154,7 @@ public class ByteArrayEndPointTest public void testReadable() throws Exception { ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, 5000); - endp.setInput("test input"); + endp.addInput("test input"); ByteBuffer buffer = BufferUtil.allocate(1024); FutureCallback fcb = new FutureCallback(); @@ -172,7 +172,7 @@ public class ByteArrayEndPointTest assertFalse(fcb.isDone()); assertEquals(0, endp.fill(buffer)); - endp.setInput(" more"); + endp.addInput(" more"); fcb.get(1000,TimeUnit.MILLISECONDS); assertTrue(fcb.isDone()); assertEquals(null, fcb.get()); @@ -185,7 +185,7 @@ public class ByteArrayEndPointTest assertFalse(fcb.isDone()); assertEquals(0, endp.fill(buffer)); - endp.setInput((ByteBuffer)null); + endp.addInput((ByteBuffer)null); assertTrue(fcb.isDone()); assertEquals(null, fcb.get()); assertEquals(-1, endp.fill(buffer)); @@ -267,7 +267,7 @@ public class ByteArrayEndPointTest { long idleTimeout = 500; ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, idleTimeout); - endp.setInput("test"); + endp.addInput("test"); endp.setGrowOutput(false); endp.setOutput(BufferUtil.allocate(5)); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 800cd098328..fe91e63e7bc 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -782,6 +782,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public String toString() { - return super.toString()+" "+BufferUtil.toDetailString(_requestBuffer); + return super.toString()+"<--"+BufferUtil.toDetailString(_requestBuffer); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 863d16a23d6..3c532bea4eb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -681,7 +681,7 @@ public class HttpInput extends ServletInputStream implements Runnable { } - protected static class ErrorState extends EOFState + protected class ErrorState extends EOFState { final Throwable _error; ErrorState(Throwable error) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 7df8b6b33a8..7865c6b7f2d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -161,7 +161,7 @@ public class LocalConnector extends AbstractConnector if (!isStarted()) throw new IllegalStateException("!STARTED"); LocalEndPoint endp = new LocalEndPoint(); - endp.setInput(rawRequest); + endp.addInput(rawRequest); _connects.add(endp); return endp; } @@ -196,14 +196,6 @@ public class LocalConnector extends AbstractConnector getExecutor().execute(task); } - public void addInput(String s) - { - // TODO this is a busy wait - while(getIn()==null || BufferUtil.hasContent(getIn())) - Thread.yield(); - setInput(BufferUtil.toBuffer(s, StandardCharsets.UTF_8)); - } - @Override public void close() { @@ -211,7 +203,6 @@ public class LocalConnector extends AbstractConnector super.close(); if (wasOpen) { -// connectionClosed(getConnection()); getConnection().onClose(); onClose(); } diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java index 52f53d9c014..e782dee6065 100644 --- a/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java @@ -94,8 +94,13 @@ public class HttpInputIntegrationTest __config = new HttpConfiguration(); __server = new Server(); - __server.addConnector(new LocalConnector(__server,new HttpConnectionFactory(__config))); - __server.addConnector(new ServerConnector(__server,new HttpConnectionFactory(__config),new HTTP2CServerConnectionFactory(__config))); + LocalConnector local=new LocalConnector(__server,new HttpConnectionFactory(__config)); + local.setIdleTimeout(4000); + __server.addConnector(local); + + ServerConnector http = new ServerConnector(__server,new HttpConnectionFactory(__config),new HTTP2CServerConnectionFactory(__config)); + http.setIdleTimeout(4000); + __server.addConnector(http); // SSL Context Factory for HTTPS and HTTP/2 @@ -124,9 +129,9 @@ public class HttpInputIntegrationTest SslConnectionFactory ssl = new SslConnectionFactory(__sslContextFactory,h1.getProtocol() /*TODO alpn.getProtocol()*/); // HTTP/2 Connector - ServerConnector http2Connector = - new ServerConnector(__server,ssl,/*TODO alpn,h2,*/ h1); - __server.addConnector(http2Connector); + ServerConnector http2 = new ServerConnector(__server,ssl,/*TODO alpn,h2,*/ h1); + http2.setIdleTimeout(4000); + __server.addConnector(http2); ServletContextHandler context = new ServletContextHandler(__server,"/ctx"); @@ -174,7 +179,7 @@ public class HttpInputIntegrationTest // + HTTP/2 // + SSL + HTTP/2 // + FASTCGI - for (Class client : new Class[]{LocalClient.class,H1Client.class,H1SClient.class}) + for (Class client : new Class[]{/* TODO LocalClient.class,*/H1Client.class,H1SClient.class}) { // test async actions that are run: