Handling request and response content.

This commit is contained in:
Simone Bordet 2013-09-08 23:00:57 +02:00
parent 5427a6d2da
commit bfad01e0dc
10 changed files with 291 additions and 47 deletions

View File

@ -105,4 +105,29 @@ public class FCGI
{ {
STD_IN, STD_OUT, STD_ERR STD_IN, STD_OUT, STD_ERR
} }
public static class Headers
{
public static final String AUTH_TYPE = "AUTH_TYPE";
public static final String CONTENT_LENGTH = "CONTENT_LENGTH";
public static final String CONTENT_TYPE = "CONTENT_TYPE";
public static final String GATEWAY_INTERFACE = "GATEWAY_INTERFACE";
public static final String PATH_INFO = "PATH_INFO";
public static final String PATH_TRANSLATED = "PATH_TRANSLATED";
public static final String QUERY_STRING = "QUERY_STRING";
public static final String REMOTE_ADDR = "REMOTE_ADDR";
public static final String REMOTE_HOST = "REMOTE_HOST";
public static final String REMOTE_USER = "REMOTE_USER";
public static final String REQUEST_METHOD = "REQUEST_METHOD";
public static final String REQUEST_URI = "REQUEST_URI";
public static final String SCRIPT_NAME = "SCRIPT_NAME";
public static final String SERVER_NAME = "SERVER_NAME";
public static final String SERVER_PORT = "SERVER_PORT";
public static final String SERVER_PROTOCOL = "SERVER_PROTOCOL";
public static final String SERVER_SOFTWARE = "SERVER_SOFTWARE";
private Headers()
{
}
}
} }

View File

@ -26,9 +26,13 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class Flusher public class Flusher
{ {
private static final Logger LOG = Log.getLogger(Flusher.class);
private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>(); private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>();
private final Callback flushCallback = new FlushCallback(); private final Callback flushCallback = new FlushCallback();
private final EndPoint endPoint; private final EndPoint endPoint;
@ -52,6 +56,11 @@ public class Flusher
endPoint.write(flushCallback); endPoint.write(flushCallback);
} }
public void shutdown()
{
flush(new ShutdownResult());
}
private class FlushCallback extends IteratingCallback private class FlushCallback extends IteratingCallback
{ {
private Generator.Result active; private Generator.Result active;
@ -106,4 +115,30 @@ public class Flusher
} }
} }
} }
private class ShutdownResult extends Generator.Result
{
private ShutdownResult()
{
super(null, new Adapter());
}
@Override
public void succeeded()
{
shutdown();
}
@Override
public void failed(Throwable x)
{
shutdown();
}
private void shutdown()
{
LOG.debug("Shutting down {}", endPoint);
endPoint.shutdownOutput();
}
}
} }

View File

@ -27,9 +27,13 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class ResponseContentParser extends StreamContentParser public class ResponseContentParser extends StreamContentParser
{ {
private static final Logger LOG = Log.getLogger(ResponseContentParser.class);
public ResponseContentParser(HeaderParser headerParser, ClientParser.Listener listener) public ResponseContentParser(HeaderParser headerParser, ClientParser.Listener listener)
{ {
super(headerParser, FCGI.StreamType.STD_OUT, new ResponseListener(headerParser, listener)); super(headerParser, FCGI.StreamType.STD_OUT, new ResponseListener(headerParser, listener));
@ -54,6 +58,8 @@ public class ResponseContentParser extends StreamContentParser
@Override @Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{ {
LOG.debug("Request {} {} content {} {}", request, stream, state, buffer);
while (buffer.hasRemaining()) while (buffer.hasRemaining())
{ {
switch (state) switch (state)
@ -89,8 +95,9 @@ public class ResponseContentParser extends StreamContentParser
@Override @Override
public void onEnd(int request) public void onEnd(int request)
{ {
// Never called for STD_OUT, since it relies on FCGI_END_REQUEST // We are a STD_OUT stream so the end of the request is
throw new IllegalStateException(); // signaled by a END_REQUEST. Here we just reset the state.
reset();
} }
@Override @Override

View File

@ -80,8 +80,7 @@ public class StreamContentParser extends ContentParser
@Override @Override
public void noContent() public void noContent()
{ {
if (streamType == FCGI.StreamType.STD_IN) onEnd();
onEnd();
} }
protected void onContent(ByteBuffer buffer) protected void onContent(ByteBuffer buffer)

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.fcgi.client.http; package org.eclipse.jetty.fcgi.client.http;
import java.nio.ByteBuffer;
import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
@ -95,6 +97,13 @@ public class HttpChannelOverFCGI extends HttpChannel
receiver.responseHeaders(exchange); receiver.responseHeaders(exchange);
} }
protected void content(ByteBuffer buffer)
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
receiver.responseContent(exchange, buffer);
}
protected void responseSuccess() protected void responseSuccess()
{ {
HttpExchange exchange = getHttpExchange(); HttpExchange exchange = getHttpExchange();

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -133,7 +134,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
private void shutdown() private void shutdown()
{ {
// TODO: we must signal to the HttpParser that we are at EOF getEndPoint().shutdownOutput();
} }
@Override @Override
@ -237,17 +238,36 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override @Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{ {
throw new UnsupportedOperationException(); switch (stream)
{
case STD_OUT:
{
HttpChannelOverFCGI channel = channels.get(request);
if (channel != null)
channel.content(buffer);
else
noChannel(request);
break;
}
case STD_ERR:
{
LOG.info(BufferUtil.toUTF8String(buffer));
break;
}
default:
{
throw new IllegalArgumentException();
}
}
} }
@Override @Override
public void onEnd(int request) public void onEnd(int request)
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = channels.remove(request);
if (channel != null) if (channel != null)
{ {
channel.responseSuccess(); channel.responseSuccess();
channels.remove(request);
releaseRequest(request); releaseRequest(request);
} }
else else

View File

@ -1,11 +1,18 @@
package org.eclipse.jetty.fcgi.client.http; package org.eclipse.jetty.fcgi.client.http;
import java.net.URI;
import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpContent; import org.eclipse.jetty.client.HttpContent;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.fcgi.generator.ClientGenerator; import org.eclipse.jetty.fcgi.generator.ClientGenerator;
import org.eclipse.jetty.fcgi.generator.Generator; import org.eclipse.jetty.fcgi.generator.Generator;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
public class HttpSenderOverFCGI extends HttpSender public class HttpSenderOverFCGI extends HttpSender
@ -27,9 +34,44 @@ public class HttpSenderOverFCGI extends HttpSender
@Override @Override
protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback) protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback)
{ {
Request httpRequest = exchange.getRequest();
URI uri = httpRequest.getURI();
HttpFields headers = httpRequest.getHeaders();
HttpField field = headers.remove(HttpHeader.AUTHORIZATION);
if (field != null)
headers.put(FCGI.Headers.AUTH_TYPE, field.getValue());
field = headers.remove(HttpHeader.CONTENT_LENGTH);
if (field != null)
headers.put(FCGI.Headers.CONTENT_LENGTH, field.getValue());
field = headers.remove(HttpHeader.CONTENT_TYPE);
if (field != null)
headers.put(FCGI.Headers.CONTENT_TYPE, field.getValue());
headers.put(FCGI.Headers.GATEWAY_INTERFACE, "CGI/1.1");
// headers.put(Headers.PATH_INFO, ???);
// headers.put(Headers.PATH_TRANSLATED, ???);
headers.put(FCGI.Headers.QUERY_STRING, uri.getQuery());
// headers.put(Headers.REMOTE_ADDR, ???);
// headers.put(Headers.REMOTE_HOST, ???);
// headers.put(Headers.REMOTE_USER, ???);
headers.put(FCGI.Headers.REQUEST_METHOD, httpRequest.getMethod());
headers.put(FCGI.Headers.REQUEST_URI, uri.toString());
headers.put(FCGI.Headers.SERVER_PROTOCOL, httpRequest.getVersion().asString());
// TODO: translate remaining HTTP header into the HTTP_* format
int request = getHttpChannel().getRequest(); int request = getHttpChannel().getRequest();
boolean noContent = !content.hasContent(); boolean noContent = !content.hasContent();
Generator.Result result = generator.generateRequestHeaders(request, exchange.getRequest().getHeaders(), Generator.Result result = generator.generateRequestHeaders(request, headers,
noContent ? new Callback.Adapter() : callback); noContent ? new Callback.Adapter() : callback);
getHttpChannel().flush(result); getHttpChannel().flush(result);
if (noContent) if (noContent)

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.IO;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -50,10 +51,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest
{ {
start(new EmptyServerHandler()); start(new EmptyServerHandler());
Response response = client.GET(scheme + "://localhost:" + connector.getLocalPort()); for (int i = 0; i < 2; ++i)
{
Assert.assertNotNull(response); Response response = client.GET(scheme + "://localhost:" + connector.getLocalPort());
Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
}
} }
@Test @Test
@ -70,12 +73,14 @@ public class HttpClientTest extends AbstractHttpClientServerTest
} }
}); });
ContentResponse response = client.GET(scheme + "://localhost:" + connector.getLocalPort()); for (int i = 0; i < 2; ++i)
{
Assert.assertNotNull(response); ContentResponse response = client.GET(scheme + "://localhost:" + connector.getLocalPort());
Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response);
byte[] content = response.getContent(); Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(data, content); byte[] content = response.getContent();
Assert.assertArrayEquals(data, content);
}
} }
@Test @Test
@ -226,7 +231,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
{ {
response.setCharacterEncoding("UTF-8"); response.setCharacterEncoding("UTF-8");
response.setContentType("application/octet-stream"); response.setContentType("application/octet-stream");
response.getOutputStream().write(content); IO.copy(request.getInputStream(), response.getOutputStream());
} }
} }
}); });
@ -234,7 +239,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort() + "/?b=1") ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort() + "/?b=1")
.param(paramName, paramValue) .param(paramName, paramValue)
.content(new BytesContentProvider(content)) .content(new BytesContentProvider(content))
.timeout(555, TimeUnit.SECONDS) .timeout(5, TimeUnit.SECONDS)
.send(); .send();
Assert.assertNotNull(response); Assert.assertNotNull(response);

View File

@ -23,7 +23,10 @@ import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
@ -38,10 +41,8 @@ import org.eclipse.jetty.util.log.Logger;
public class HttpChannelOverFCGI extends HttpChannel<ByteBuffer> public class HttpChannelOverFCGI extends HttpChannel<ByteBuffer>
{ {
private static final Logger LOG = Log.getLogger(HttpChannelOverFCGI.class); private static final Logger LOG = Log.getLogger(HttpChannelOverFCGI.class);
private static final String METHOD_HEADER = "REQUEST_METHOD";
private static final String URI_HEADER = "REQUEST_URI";
private static final String VERSION_HEADER = "SERVER_PROTOCOL";
private final Dispatcher dispatcher;
private String method; private String method;
private String uri; private String uri;
private String version; private String version;
@ -51,25 +52,24 @@ public class HttpChannelOverFCGI extends HttpChannel<ByteBuffer>
public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input) public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
{ {
super(connector, configuration, endPoint, transport, input); super(connector, configuration, endPoint, transport, input);
this.dispatcher = new Dispatcher(connector.getExecutor(), this);
} }
public void header(HttpField field) protected void header(HttpField field)
{ {
LOG.debug("FCGI header {}", field); if (FCGI.Headers.REQUEST_METHOD.equalsIgnoreCase(field.getName()))
if (METHOD_HEADER.equalsIgnoreCase(field.getName()))
{ {
method = field.getValue(); method = field.getValue();
if (uri != null && version != null) if (uri != null && version != null)
startRequest(); startRequest();
} }
else if (URI_HEADER.equalsIgnoreCase(field.getName())) else if (FCGI.Headers.REQUEST_URI.equalsIgnoreCase(field.getName()))
{ {
uri = field.getValue(); uri = field.getValue();
if (method != null && version != null) if (method != null && version != null)
startRequest(); startRequest();
} }
else if (VERSION_HEADER.equalsIgnoreCase(field.getName())) else if (FCGI.Headers.SERVER_PROTOCOL.equalsIgnoreCase(field.getName()))
{ {
version = field.getValue(); version = field.getValue();
if (method != null && uri != null) if (method != null && uri != null)
@ -126,12 +126,104 @@ public class HttpChannelOverFCGI extends HttpChannel<ByteBuffer>
} }
field = new HttpField(httpName.toString(), field.getValue()); field = new HttpField(httpName.toString(), field.getValue());
} }
LOG.debug("HTTP header {}", field);
parsedHeader(field); parsedHeader(field);
} }
public void dispatch() @Override
public boolean headerComplete()
{ {
getConnector().getExecutor().execute(this); boolean result = super.headerComplete();
started = false;
return result;
}
protected void dispatch()
{
dispatcher.dispatch();
}
private static class Dispatcher implements Runnable
{
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
private final Executor executor;
private final Runnable runnable;
private Dispatcher(Executor executor, Runnable runnable)
{
this.executor = executor;
this.runnable = runnable;
}
public void dispatch()
{
while (true)
{
State current = state.get();
switch (current)
{
case IDLE:
{
if (!state.compareAndSet(current, State.DISPATCH))
continue;
executor.execute(this);
return;
}
case DISPATCH:
case EXECUTE:
{
if (state.compareAndSet(current, State.SCHEDULE))
return;
continue;
}
case SCHEDULE:
{
return;
}
default:
{
throw new IllegalStateException();
}
}
}
}
@Override
public void run()
{
while (true)
{
State current = state.get();
switch (current)
{
case DISPATCH:
{
if (state.compareAndSet(current, State.EXECUTE))
runnable.run();
continue;
}
case EXECUTE:
{
if (state.compareAndSet(current, State.IDLE))
return;
continue;
}
case SCHEDULE:
{
if (state.compareAndSet(current, State.DISPATCH))
continue;
throw new IllegalStateException();
}
default:
{
throw new IllegalStateException();
}
}
}
}
private enum State
{
IDLE, DISPATCH, EXECUTE, SCHEDULE
}
} }
} }

View File

@ -109,7 +109,7 @@ public class ServerFCGIConnection extends AbstractConnection
private void shutdown() private void shutdown()
{ {
// TODO flusher.shutdown();
} }
private class ServerListener implements ServerParser.Listener private class ServerListener implements ServerParser.Listener
@ -122,47 +122,57 @@ public class ServerFCGIConnection extends AbstractConnection
HttpChannelOverFCGI existing = channels.putIfAbsent(request, channel); HttpChannelOverFCGI existing = channels.putIfAbsent(request, channel);
if (existing != null) if (existing != null)
throw new IllegalStateException(); throw new IllegalStateException();
if (LOG.isDebugEnabled())
LOG.debug("Request {} start on {}", request, channel);
} }
@Override @Override
public void onHeader(int request, HttpField field) public void onHeader(int request, HttpField field)
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = channels.get(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} header {} on {}", request, field, channel);
if (channel != null) if (channel != null)
channel.header(field); channel.header(field);
else
noChannel(request);
} }
@Override @Override
public void onHeaders(int request) public void onHeaders(int request)
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = channels.get(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} headers on {}", request, channel);
if (channel != null) if (channel != null)
channel.headerComplete(); {
else if (channel.headerComplete())
noChannel(request); channel.dispatch();
}
} }
@Override @Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{ {
HttpChannelOverFCGI channel = channels.get(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} {} content {} on {}", request, stream, buffer, channel);
if (channel != null)
{
if (channel.content(buffer))
channel.dispatch();
}
} }
@Override @Override
public void onEnd(int request) public void onEnd(int request)
{ {
HttpChannelOverFCGI channel = channels.get(request); HttpChannelOverFCGI channel = channels.remove(request);
if (LOG.isDebugEnabled())
LOG.debug("Request {} end on {}", request, channel);
if (channel != null) if (channel != null)
{
if (channel.messageComplete()) if (channel.messageComplete())
channel.dispatch(); channel.dispatch();
else }
noChannel(request);
}
private void noChannel(int request)
{
// TODO
} }
} }
} }