Improved LocalConnector

This commit is contained in:
Greg Wilkins 2015-02-26 20:13:45 +11:00
parent af70c4bd48
commit 165ae8f238
7 changed files with 157 additions and 86 deletions

View File

@ -85,7 +85,7 @@ public class HttpReceiverOverHTTPTest
@Test @Test
public void test_Receive_NoResponseContent() throws Exception public void test_Receive_NoResponseContent() throws Exception
{ {
endPoint.setInput("" + endPoint.addInput("" +
"HTTP/1.1 200 OK\r\n" + "HTTP/1.1 200 OK\r\n" +
"Content-length: 0\r\n" + "Content-length: 0\r\n" +
"\r\n"); "\r\n");
@ -108,7 +108,7 @@ public class HttpReceiverOverHTTPTest
public void test_Receive_ResponseContent() throws Exception public void test_Receive_ResponseContent() throws Exception
{ {
String content = "0123456789ABCDEF"; String content = "0123456789ABCDEF";
endPoint.setInput("" + endPoint.addInput("" +
"HTTP/1.1 200 OK\r\n" + "HTTP/1.1 200 OK\r\n" +
"Content-length: " + content.length() + "\r\n" + "Content-length: " + content.length() + "\r\n" +
"\r\n" + "\r\n" +
@ -135,7 +135,7 @@ public class HttpReceiverOverHTTPTest
{ {
String content1 = "0123456789"; String content1 = "0123456789";
String content2 = "ABCDEF"; String content2 = "ABCDEF";
endPoint.setInput("" + endPoint.addInput("" +
"HTTP/1.1 200 OK\r\n" + "HTTP/1.1 200 OK\r\n" +
"Content-length: " + (content1.length() + content2.length()) + "\r\n" + "Content-length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" + "\r\n" +
@ -143,7 +143,7 @@ public class HttpReceiverOverHTTPTest
HttpExchange exchange = newExchange(); HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0); FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.getHttpChannel().receive(); connection.getHttpChannel().receive();
endPoint.setInputEOF(); endPoint.addInputEOF();
connection.getHttpChannel().receive(); connection.getHttpChannel().receive();
try try
@ -160,7 +160,7 @@ public class HttpReceiverOverHTTPTest
@Test @Test
public void test_Receive_ResponseContent_IdleTimeout() throws Exception public void test_Receive_ResponseContent_IdleTimeout() throws Exception
{ {
endPoint.setInput("" + endPoint.addInput("" +
"HTTP/1.1 200 OK\r\n" + "HTTP/1.1 200 OK\r\n" +
"Content-length: 1\r\n" + "Content-length: 1\r\n" +
"\r\n"); "\r\n");
@ -184,7 +184,7 @@ public class HttpReceiverOverHTTPTest
@Test @Test
public void test_Receive_BadResponse() throws Exception public void test_Receive_BadResponse() throws Exception
{ {
endPoint.setInput("" + endPoint.addInput("" +
"HTTP/1.1 200 OK\r\n" + "HTTP/1.1 200 OK\r\n" +
"Content-length: A\r\n" + "Content-length: A\r\n" +
"\r\n"); "\r\n");
@ -225,7 +225,7 @@ public class HttpReceiverOverHTTPTest
// before fillInterested() is called. // before fillInterested() is called.
Assert.assertNull(getResponseBuffer()); Assert.assertNull(getResponseBuffer());
// Fill the endpoint so receive is called again. // Fill the endpoint so receive is called again.
endPoint.setInput("X"); endPoint.addInput("X");
super.fillInterested(); super.fillInterested();
} }
}; };
@ -235,7 +235,7 @@ public class HttpReceiverOverHTTPTest
}; };
// Partial response to trigger the call to fillInterested(). // Partial response to trigger the call to fillInterested().
endPoint.setInput("" + endPoint.addInput("" +
"HTTP/1.1 200 OK\r\n" + "HTTP/1.1 200 OK\r\n" +
"Content-Length: 1\r\n" + "Content-Length: 1\r\n" +
"\r\n"); "\r\n");

View File

@ -18,18 +18,21 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit; import java.util.Queue;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SpinLock;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -40,6 +43,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{ {
static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class); static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class);
public final static InetSocketAddress NOIP=new InetSocketAddress(0); public final static InetSocketAddress NOIP=new InetSocketAddress(0);
private static final ByteBuffer EOF = BufferUtil.allocate(0);
private final Runnable _runFillable = new Runnable() private final Runnable _runFillable = new Runnable()
{ {
@ -50,13 +54,13 @@ public class ByteArrayEndPoint extends AbstractEndPoint
} }
}; };
protected volatile ByteBuffer _in; private final SpinLock _lock = new SpinLock();
protected volatile ByteBuffer _out; private final Queue<ByteBuffer> _inQ = new ArrayQueue<>();
protected volatile boolean _ishut; private ByteBuffer _out;
protected volatile boolean _oshut; private boolean _ishut;
protected volatile boolean _closed; private boolean _oshut;
protected volatile boolean _growOutput; private boolean _closed;
private boolean _growOutput;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -107,7 +111,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output) public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output)
{ {
super(timer,NOIP,NOIP); super(timer,NOIP,NOIP);
_in=input==null?BufferUtil.EMPTY_BUFFER:input; if (BufferUtil.hasContent(input))
addInput(input);
_out=output==null?BufferUtil.allocate(1024):output; _out=output==null?BufferUtil.allocate(1024):output;
setIdleTimeout(idleTimeoutMs); setIdleTimeout(idleTimeoutMs);
} }
@ -129,50 +134,62 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
protected void needsFillInterest() throws IOException protected void needsFillInterest() throws IOException
{ {
if (_closed) try(SpinLock.Lock lock = _lock.lock())
throw new ClosedChannelException(); {
if (BufferUtil.hasContent(_in) || _in==null) if (_closed)
execute(_runFillable); throw new ClosedChannelException();
}
ByteBuffer in = _inQ.peek();
/* ------------------------------------------------------------ */ if (BufferUtil.hasContent(in) || in==EOF)
/** execute(_runFillable);
* @return Returns the in. }
*/
public ByteBuffer getIn()
{
return _in;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
*/ */
public void setInputEOF() public void addInputEOF()
{ {
_in = null; addInput((ByteBuffer)null);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @param in The in to set. * @param in The in to set.
*/ */
public void setInput(ByteBuffer in) public void addInput(ByteBuffer in)
{ {
_in = in; boolean fillable=false;
if (in == null || BufferUtil.hasContent(in)) try(SpinLock.Lock lock = _lock.lock())
getFillInterest().fillable(); {
if (_inQ.peek()==EOF)
throw new RuntimeIOException(new EOFException());
boolean was_empty=_inQ.isEmpty();
if (in==null)
{
_inQ.add(EOF);
fillable=true;
}
if (BufferUtil.hasContent(in))
{
_inQ.add(in);
fillable=was_empty;
}
}
if (fillable)
_runFillable.run();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void setInput(String s) public void addInput(String s)
{ {
setInput(BufferUtil.toBuffer(s,StandardCharsets.UTF_8)); addInput(BufferUtil.toBuffer(s,StandardCharsets.UTF_8));
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void setInput(String s,Charset charset) public void addInput(String s,Charset charset)
{ {
setInput(BufferUtil.toBuffer(s,charset)); addInput(BufferUtil.toBuffer(s,charset));
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -250,7 +267,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public boolean isOpen() public boolean isOpen()
{ {
return !_closed; try(SpinLock.Lock lock = _lock.lock())
{
return !_closed;
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -259,7 +279,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public boolean isInputShutdown() public boolean isInputShutdown()
{ {
return _ishut||_closed; try(SpinLock.Lock lock = _lock.lock())
{
return _ishut||_closed;
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -268,15 +291,24 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public boolean isOutputShutdown() public boolean isOutputShutdown()
{ {
return _oshut||_closed; try(SpinLock.Lock lock = _lock.lock())
{
return _oshut||_closed;
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private void shutdownInput() public void shutdownInput()
{ {
_ishut=true; boolean close=false;
if (_oshut) try(SpinLock.Lock lock = _lock.lock())
close(); {
_ishut=true;
if (_oshut && !_closed)
close=_closed=true;
}
if (close)
super.close();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -286,9 +318,15 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public void shutdownOutput() public void shutdownOutput()
{ {
_oshut=true; boolean close=false;
if (_ishut) try(SpinLock.Lock lock = _lock.lock())
close(); {
_oshut=true;
if (_ishut && !_closed)
close=_closed=true;
}
if (close)
super.close();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -298,8 +336,14 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public void close() public void close()
{ {
super.close(); boolean close=false;
_closed=true; try(SpinLock.Lock lock = _lock.lock())
{
if (!_closed)
close=_closed=_ishut=_oshut=true;
}
if (close)
super.close();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -318,13 +362,44 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public int fill(ByteBuffer buffer) throws IOException public int fill(ByteBuffer buffer) throws IOException
{ {
if (_closed) int filled=0;
throw new EofException("CLOSED"); boolean close=false;
if (_in==null) try(SpinLock.Lock lock = _lock.lock())
shutdownInput(); {
if (_ishut) while(true)
return -1; {
int filled=BufferUtil.append(buffer,_in); if (_closed)
throw new EofException("CLOSED");
if (_ishut)
return -1;
if (_inQ.isEmpty())
break;
ByteBuffer in= _inQ.peek();
if (in==EOF)
{
_ishut=true;
if (_oshut)
close=_closed=true;
filled=-1;
break;
}
if (BufferUtil.hasContent(in))
{
filled=BufferUtil.append(buffer,in);
if (BufferUtil.isEmpty(in))
_inQ.poll();
break;
}
_inQ.poll();
}
}
if (close)
super.close();
if (filled>0) if (filled>0)
notIdle(); notIdle();
return filled; return filled;
@ -386,7 +461,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
_ishut=false; _ishut=false;
_oshut=false; _oshut=false;
_closed=false; _closed=false;
_in=null; _inQ.clear();
BufferUtil.clear(_out); BufferUtil.clear(_out);
} }

View File

@ -64,7 +64,7 @@ public class ByteArrayEndPointTest
public void testFill() throws Exception public void testFill() throws Exception
{ {
ByteArrayEndPoint endp = new ByteArrayEndPoint(); ByteArrayEndPoint endp = new ByteArrayEndPoint();
endp.setInput("test input"); endp.addInput("test input");
ByteBuffer buffer = BufferUtil.allocate(1024); ByteBuffer buffer = BufferUtil.allocate(1024);
@ -73,13 +73,13 @@ public class ByteArrayEndPointTest
assertEquals(0,endp.fill(buffer)); assertEquals(0,endp.fill(buffer));
endp.setInput(" more"); endp.addInput(" more");
assertEquals(5,endp.fill(buffer)); assertEquals(5,endp.fill(buffer));
assertEquals("test input more",BufferUtil.toString(buffer)); assertEquals("test input more",BufferUtil.toString(buffer));
assertEquals(0,endp.fill(buffer)); assertEquals(0,endp.fill(buffer));
endp.setInput((ByteBuffer)null); endp.addInput((ByteBuffer)null);
assertEquals(-1,endp.fill(buffer)); assertEquals(-1,endp.fill(buffer));
@ -96,7 +96,7 @@ public class ByteArrayEndPointTest
} }
endp.reset(); endp.reset();
endp.setInput("and more"); endp.addInput("and more");
buffer = BufferUtil.allocate(4); buffer = BufferUtil.allocate(4);
assertEquals(4,endp.fill(buffer)); assertEquals(4,endp.fill(buffer));
@ -154,7 +154,7 @@ public class ByteArrayEndPointTest
public void testReadable() throws Exception public void testReadable() throws Exception
{ {
ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, 5000); ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, 5000);
endp.setInput("test input"); endp.addInput("test input");
ByteBuffer buffer = BufferUtil.allocate(1024); ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback fcb = new FutureCallback(); FutureCallback fcb = new FutureCallback();
@ -172,7 +172,7 @@ public class ByteArrayEndPointTest
assertFalse(fcb.isDone()); assertFalse(fcb.isDone());
assertEquals(0, endp.fill(buffer)); assertEquals(0, endp.fill(buffer));
endp.setInput(" more"); endp.addInput(" more");
fcb.get(1000,TimeUnit.MILLISECONDS); fcb.get(1000,TimeUnit.MILLISECONDS);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals(null, fcb.get()); assertEquals(null, fcb.get());
@ -185,7 +185,7 @@ public class ByteArrayEndPointTest
assertFalse(fcb.isDone()); assertFalse(fcb.isDone());
assertEquals(0, endp.fill(buffer)); assertEquals(0, endp.fill(buffer));
endp.setInput((ByteBuffer)null); endp.addInput((ByteBuffer)null);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals(null, fcb.get()); assertEquals(null, fcb.get());
assertEquals(-1, endp.fill(buffer)); assertEquals(-1, endp.fill(buffer));
@ -267,7 +267,7 @@ public class ByteArrayEndPointTest
{ {
long idleTimeout = 500; long idleTimeout = 500;
ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, idleTimeout); ByteArrayEndPoint endp = new ByteArrayEndPoint(_scheduler, idleTimeout);
endp.setInput("test"); endp.addInput("test");
endp.setGrowOutput(false); endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(5)); endp.setOutput(BufferUtil.allocate(5));

View File

@ -782,6 +782,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override @Override
public String toString() public String toString()
{ {
return super.toString()+" "+BufferUtil.toDetailString(_requestBuffer); return super.toString()+"<--"+BufferUtil.toDetailString(_requestBuffer);
} }
} }

View File

@ -681,7 +681,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
} }
protected static class ErrorState extends EOFState protected class ErrorState extends EOFState
{ {
final Throwable _error; final Throwable _error;
ErrorState(Throwable error) ErrorState(Throwable error)

View File

@ -161,7 +161,7 @@ public class LocalConnector extends AbstractConnector
if (!isStarted()) if (!isStarted())
throw new IllegalStateException("!STARTED"); throw new IllegalStateException("!STARTED");
LocalEndPoint endp = new LocalEndPoint(); LocalEndPoint endp = new LocalEndPoint();
endp.setInput(rawRequest); endp.addInput(rawRequest);
_connects.add(endp); _connects.add(endp);
return endp; return endp;
} }
@ -196,14 +196,6 @@ public class LocalConnector extends AbstractConnector
getExecutor().execute(task); getExecutor().execute(task);
} }
public void addInput(String s)
{
// TODO this is a busy wait
while(getIn()==null || BufferUtil.hasContent(getIn()))
Thread.yield();
setInput(BufferUtil.toBuffer(s, StandardCharsets.UTF_8));
}
@Override @Override
public void close() public void close()
{ {
@ -211,7 +203,6 @@ public class LocalConnector extends AbstractConnector
super.close(); super.close();
if (wasOpen) if (wasOpen)
{ {
// connectionClosed(getConnection());
getConnection().onClose(); getConnection().onClose();
onClose(); onClose();
} }

View File

@ -94,8 +94,13 @@ public class HttpInputIntegrationTest
__config = new HttpConfiguration(); __config = new HttpConfiguration();
__server = new Server(); __server = new Server();
__server.addConnector(new LocalConnector(__server,new HttpConnectionFactory(__config))); LocalConnector local=new LocalConnector(__server,new HttpConnectionFactory(__config));
__server.addConnector(new ServerConnector(__server,new HttpConnectionFactory(__config),new HTTP2CServerConnectionFactory(__config))); local.setIdleTimeout(4000);
__server.addConnector(local);
ServerConnector http = new ServerConnector(__server,new HttpConnectionFactory(__config),new HTTP2CServerConnectionFactory(__config));
http.setIdleTimeout(4000);
__server.addConnector(http);
// SSL Context Factory for HTTPS and HTTP/2 // SSL Context Factory for HTTPS and HTTP/2
@ -124,9 +129,9 @@ public class HttpInputIntegrationTest
SslConnectionFactory ssl = new SslConnectionFactory(__sslContextFactory,h1.getProtocol() /*TODO alpn.getProtocol()*/); SslConnectionFactory ssl = new SslConnectionFactory(__sslContextFactory,h1.getProtocol() /*TODO alpn.getProtocol()*/);
// HTTP/2 Connector // HTTP/2 Connector
ServerConnector http2Connector = ServerConnector http2 = new ServerConnector(__server,ssl,/*TODO alpn,h2,*/ h1);
new ServerConnector(__server,ssl,/*TODO alpn,h2,*/ h1); http2.setIdleTimeout(4000);
__server.addConnector(http2Connector); __server.addConnector(http2);
ServletContextHandler context = new ServletContextHandler(__server,"/ctx"); ServletContextHandler context = new ServletContextHandler(__server,"/ctx");
@ -174,7 +179,7 @@ public class HttpInputIntegrationTest
// + HTTP/2 // + HTTP/2
// + SSL + HTTP/2 // + SSL + HTTP/2
// + FASTCGI // + FASTCGI
for (Class<? extends TestClient> client : new Class[]{LocalClient.class,H1Client.class,H1SClient.class}) for (Class<? extends TestClient> client : new Class[]{/* TODO LocalClient.class,*/H1Client.class,H1SClient.class})
{ {
// test async actions that are run: // test async actions that are run: