refactored HttpConnector for EWYK - work in progress

This commit is contained in:
Greg Wilkins 2014-12-23 17:15:27 +01:00
parent 023c593d35
commit 3f59bc4c14
14 changed files with 91 additions and 36 deletions

View File

@ -138,7 +138,7 @@ public class HttpTransportOverFCGI implements HttpTransport
}
@Override
public void completed()
public void onCompleted()
{
}
}

View File

@ -176,7 +176,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
@Override
public void completed()
public void onCompleted()
{
}

View File

@ -41,6 +41,15 @@ public class ByteArrayEndPoint extends AbstractEndPoint
static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class);
public final static InetSocketAddress NOIP=new InetSocketAddress(0);
private final Runnable _runFillable = new Runnable()
{
@Override
public void run()
{
getFillInterest().fillable();
}
};
protected volatile ByteBuffer _in;
protected volatile ByteBuffer _out;
protected volatile boolean _ishut;
@ -110,6 +119,12 @@ public class ByteArrayEndPoint extends AbstractEndPoint
// Don't need to do anything here as takeOutput does the signalling.
}
/* ------------------------------------------------------------ */
protected void execute(Runnable task)
{
new Thread(task,"BAEPoint-"+Integer.toHexString(hashCode()));
}
/* ------------------------------------------------------------ */
@Override
protected void needsFillInterest() throws IOException
@ -117,14 +132,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
if (_closed)
throw new ClosedChannelException();
if (BufferUtil.hasContent(_in) || _in==null)
getScheduler().schedule(new Runnable()
{
public void run()
{
if (!_closed && _in!=null)
getFillInterest().fillable();
}
},1,TimeUnit.MILLISECONDS);
execute(_runFillable);
}
/* ------------------------------------------------------------ */

View File

@ -549,7 +549,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
if (_requestLog!=null )
_requestLog.log(_request,_committedMetaData==null?-1:_committedMetaData.getStatus(), _written);
_transport.completed();
_transport.onCompleted();
}
public void onEarlyEOF()

View File

@ -330,7 +330,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
public void completed()
public void onCompleted()
{
// Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)

View File

@ -31,7 +31,7 @@ public interface HttpTransport
void push(MetaData.Request request);
void completed();
void onCompleted();
/**
* Aborts this transport.

View File

@ -191,6 +191,11 @@ public class LocalConnector extends AbstractConnector
setGrowOutput(true);
}
protected void execute(Runnable task)
{
getExecutor().execute(task);
}
public void addInput(String s)
{
// TODO this is a busy wait

View File

@ -986,8 +986,8 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
}
String in = new String(b, 0, i, StandardCharsets.UTF_8);
assertTrue(in.contains("123456789"));
assertTrue(in.contains("abcdefghZ"));
assertThat(in,containsString("123456789"));
assertThat(in,containsString("abcdefghZ"));
assertFalse(in.contains("Wibble"));
in = new String(b, i, b.length - i, StandardCharsets.UTF_16);
@ -1086,9 +1086,9 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
os.flush();
String in = IO.toString(is);
assertTrue(in.contains("123456789"));
assertTrue(in.contains("abcdefghi"));
assertTrue(in.contains("Wibble"));
assertThat(in,containsString("123456789"));
assertThat(in,containsString("abcdefghi"));
assertThat(in,containsString("Wibble"));
}
}
@ -1154,7 +1154,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
assertEquals(-1, is.read()); // Closed by error!
assertTrue(in.contains("HTTP/1.1 200 OK"));
assertThat(in,containsString("HTTP/1.1 200 OK"));
assertTrue(in.indexOf("Transfer-Encoding: chunked") > 0);
assertTrue(in.indexOf("Now is the time for all good men to come to the aid of the party") > 0);
assertThat(in, Matchers.not(Matchers.containsString("\r\n0\r\n")));

View File

@ -101,7 +101,7 @@ public class ResponseTest
}
@Override
public void completed()
public void onCompleted()
{
}

View File

@ -171,42 +171,68 @@ public class StatisticsHandlerTest
assertEquals(0, _statsHandler.getAsyncDispatches());
assertEquals(0, _statsHandler.getExpires());
assertEquals(2, _statsHandler.getResponses2xx());
}
@Test
public void testTwoRequests() throws Exception
{
final CyclicBarrier barrier[] = {new CyclicBarrier(3), new CyclicBarrier(3)};
_latchHandler.reset(2);
barrier[0] = new CyclicBarrier(3);
barrier[1] = new CyclicBarrier(3);
_statsHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
{
request.setHandled(true);
try
{
barrier[0].await();
barrier[1].await();
}
catch (Exception x)
{
Thread.currentThread().interrupt();
throw (IOException)new IOException().initCause(x);
}
}
});
_server.start();
String request = "GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n";
_connector.executeRequest(request);
_connector.executeRequest(request);
barrier[0].await();
assertEquals(4, _statistics.getConnectionsOpen());
assertEquals(2, _statistics.getConnectionsOpen());
assertEquals(4, _statsHandler.getRequests());
assertEquals(2, _statsHandler.getRequests());
assertEquals(2, _statsHandler.getRequestsActive());
assertEquals(2, _statsHandler.getRequestsActiveMax());
assertEquals(4, _statsHandler.getDispatched());
assertEquals(2, _statsHandler.getDispatched());
assertEquals(2, _statsHandler.getDispatchedActive());
assertEquals(2, _statsHandler.getDispatchedActiveMax());
barrier[1].await();
assertTrue(_latchHandler.await());
assertEquals(4, _statsHandler.getRequests());
assertEquals(2, _statsHandler.getRequests());
assertEquals(0, _statsHandler.getRequestsActive());
assertEquals(2, _statsHandler.getRequestsActiveMax());
assertEquals(4, _statsHandler.getDispatched());
assertEquals(2, _statsHandler.getDispatched());
assertEquals(0, _statsHandler.getDispatchedActive());
assertEquals(2, _statsHandler.getDispatchedActiveMax());
assertEquals(0, _statsHandler.getAsyncRequests());
assertEquals(0, _statsHandler.getAsyncDispatches());
assertEquals(0, _statsHandler.getExpires());
assertEquals(4, _statsHandler.getResponses2xx());
assertEquals(2, _statsHandler.getResponses2xx());
}
@Test

View File

@ -1,3 +1,3 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.server.LEVEL=DEBUG

View File

@ -224,7 +224,7 @@ public class HttpTransportOverSPDY implements HttpTransport
}
@Override
public void completed()
public void onCompleted()
{
if (LOG.isDebugEnabled())
LOG.debug("Completed {}", this);
@ -264,7 +264,7 @@ public class HttpTransportOverSPDY implements HttpTransport
}
@Override
public void completed()
public void onCompleted()
{
Stream stream = getStream();
if (LOG.isDebugEnabled())

View File

@ -141,12 +141,12 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
}
@Override
public void completed()
public void onCompleted()
{
headers.clear();
stream = null;
content = null;
super.completed();
super.onCompleted();
}
@Override
@ -275,7 +275,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
});
if (replyInfo.isClose())
completed();
onCompleted();
handler.succeeded();
}
@ -311,7 +311,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
});
if (dataInfo.isClose())
completed();
onCompleted();
handler.succeeded();
}

View File

@ -21,6 +21,9 @@ package org.eclipse.jetty.util.thread;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>An {@link ExecutionStrategy} executes {@link Runnable} tasks produced by a {@link Producer}.
* The strategy to execute the task may vary depending on the implementation; the task may be
@ -59,6 +62,7 @@ public interface ExecutionStrategy
*/
public static class ProduceExecuteRun implements ExecutionStrategy
{
private static final Logger LOG = Log.getLogger(ExecutionStrategy.class);
private final Producer _producer;
private final Executor _executor;
@ -76,6 +80,8 @@ public interface ExecutionStrategy
{
// Produce a task.
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} PER produced {}",_producer,task);
if (task == null)
break;
@ -107,6 +113,7 @@ public interface ExecutionStrategy
*/
public static class ExecuteProduceRun implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(ExecutionStrategy.class);
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
private final Producer _producer;
private final Executor _executor;
@ -140,6 +147,8 @@ public interface ExecutionStrategy
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("{} EPR executed",_producer);
// A new thread has arrived, so clear the PENDING
// flag and try to set the PRODUCING flag.
if (!clearPendingTryProducing())
@ -147,8 +156,13 @@ public interface ExecutionStrategy
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("{} EPR producing",_producer);
// If we got here, then we are the thread that is producing.
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} EPR produced {}",_producer,task);
// If no task was produced...
if (task == null)
@ -163,6 +177,8 @@ public interface ExecutionStrategy
// and try to set the PENDING flag.
if (clearProducingTryPending())
{
if (LOG.isDebugEnabled())
LOG.debug("{} EPR executed self",_producer);
// Spawn a new thread to continue production.
_executor.execute(this);
}