From bfad01e0dc59cc7c74005e7b46e168cb44861225 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Sun, 8 Sep 2013 23:00:57 +0200 Subject: [PATCH] Handling request and response content. --- .../java/org/eclipse/jetty/fcgi/FCGI.java | 25 ++++ .../eclipse/jetty/fcgi/generator/Flusher.java | 35 ++++++ .../fcgi/parser/ResponseContentParser.java | 11 +- .../fcgi/parser/StreamContentParser.java | 3 +- .../fcgi/client/http/HttpChannelOverFCGI.java | 9 ++ .../client/http/HttpConnectionOverFCGI.java | 28 ++++- .../fcgi/client/http/HttpSenderOverFCGI.java | 44 ++++++- .../fcgi/client/http/HttpClientTest.java | 29 +++-- .../fcgi/server/HttpChannelOverFCGI.java | 116 ++++++++++++++++-- .../fcgi/server/ServerFCGIConnection.java | 38 +++--- 10 files changed, 291 insertions(+), 47 deletions(-) diff --git a/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/FCGI.java b/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/FCGI.java index 10cf644db6c..aae8d6c351c 100644 --- a/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/FCGI.java +++ b/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/FCGI.java @@ -105,4 +105,29 @@ public class FCGI { STD_IN, STD_OUT, STD_ERR } + + public static class Headers + { + public static final String AUTH_TYPE = "AUTH_TYPE"; + public static final String CONTENT_LENGTH = "CONTENT_LENGTH"; + public static final String CONTENT_TYPE = "CONTENT_TYPE"; + public static final String GATEWAY_INTERFACE = "GATEWAY_INTERFACE"; + public static final String PATH_INFO = "PATH_INFO"; + public static final String PATH_TRANSLATED = "PATH_TRANSLATED"; + public static final String QUERY_STRING = "QUERY_STRING"; + public static final String REMOTE_ADDR = "REMOTE_ADDR"; + public static final String REMOTE_HOST = "REMOTE_HOST"; + public static final String REMOTE_USER = "REMOTE_USER"; + public static final String REQUEST_METHOD = "REQUEST_METHOD"; + public static final String REQUEST_URI = "REQUEST_URI"; + public static final String SCRIPT_NAME = "SCRIPT_NAME"; + public static final String SERVER_NAME = "SERVER_NAME"; + public static final String SERVER_PORT = "SERVER_PORT"; + public static final String SERVER_PROTOCOL = "SERVER_PROTOCOL"; + public static final String SERVER_SOFTWARE = "SERVER_SOFTWARE"; + + private Headers() + { + } + } } diff --git a/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java b/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java index ccf868b3d54..8fd8b7252be 100644 --- a/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java +++ b/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java @@ -26,9 +26,13 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; public class Flusher { + private static final Logger LOG = Log.getLogger(Flusher.class); + private final Queue queue = new ConcurrentArrayQueue<>(); private final Callback flushCallback = new FlushCallback(); private final EndPoint endPoint; @@ -52,6 +56,11 @@ public class Flusher endPoint.write(flushCallback); } + public void shutdown() + { + flush(new ShutdownResult()); + } + private class FlushCallback extends IteratingCallback { private Generator.Result active; @@ -106,4 +115,30 @@ public class Flusher } } } + + private class ShutdownResult extends Generator.Result + { + private ShutdownResult() + { + super(null, new Adapter()); + } + + @Override + public void succeeded() + { + shutdown(); + } + + @Override + public void failed(Throwable x) + { + shutdown(); + } + + private void shutdown() + { + LOG.debug("Shutting down {}", endPoint); + endPoint.shutdownOutput(); + } + } } diff --git a/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java b/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java index 06ffe253a92..5a72638bfd5 100644 --- a/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java +++ b/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java @@ -27,9 +27,13 @@ import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; public class ResponseContentParser extends StreamContentParser { + private static final Logger LOG = Log.getLogger(ResponseContentParser.class); + public ResponseContentParser(HeaderParser headerParser, ClientParser.Listener listener) { super(headerParser, FCGI.StreamType.STD_OUT, new ResponseListener(headerParser, listener)); @@ -54,6 +58,8 @@ public class ResponseContentParser extends StreamContentParser @Override public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { + LOG.debug("Request {} {} content {} {}", request, stream, state, buffer); + while (buffer.hasRemaining()) { switch (state) @@ -89,8 +95,9 @@ public class ResponseContentParser extends StreamContentParser @Override public void onEnd(int request) { - // Never called for STD_OUT, since it relies on FCGI_END_REQUEST - throw new IllegalStateException(); + // We are a STD_OUT stream so the end of the request is + // signaled by a END_REQUEST. Here we just reset the state. + reset(); } @Override diff --git a/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java b/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java index 79f0ed3fe7b..10c15be663e 100644 --- a/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java +++ b/fcgi-core/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java @@ -80,8 +80,7 @@ public class StreamContentParser extends ContentParser @Override public void noContent() { - if (streamType == FCGI.StreamType.STD_IN) - onEnd(); + onEnd(); } protected void onContent(ByteBuffer buffer) diff --git a/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java b/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java index 83d8584e61c..ad095fe17e7 100644 --- a/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java +++ b/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java @@ -18,6 +18,8 @@ package org.eclipse.jetty.fcgi.client.http; +import java.nio.ByteBuffer; + import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; @@ -95,6 +97,13 @@ public class HttpChannelOverFCGI extends HttpChannel receiver.responseHeaders(exchange); } + protected void content(ByteBuffer buffer) + { + HttpExchange exchange = getHttpExchange(); + if (exchange != null) + receiver.responseContent(exchange, buffer); + } + protected void responseSuccess() { HttpExchange exchange = getHttpExchange(); diff --git a/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index 108b2ba19a0..b0fed4dce00 100644 --- a/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -37,6 +37,7 @@ import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -133,7 +134,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec private void shutdown() { - // TODO: we must signal to the HttpParser that we are at EOF + getEndPoint().shutdownOutput(); } @Override @@ -237,17 +238,36 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec @Override public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { - throw new UnsupportedOperationException(); + switch (stream) + { + case STD_OUT: + { + HttpChannelOverFCGI channel = channels.get(request); + if (channel != null) + channel.content(buffer); + else + noChannel(request); + break; + } + case STD_ERR: + { + LOG.info(BufferUtil.toUTF8String(buffer)); + break; + } + default: + { + throw new IllegalArgumentException(); + } + } } @Override public void onEnd(int request) { - HttpChannelOverFCGI channel = channels.get(request); + HttpChannelOverFCGI channel = channels.remove(request); if (channel != null) { channel.responseSuccess(); - channels.remove(request); releaseRequest(request); } else diff --git a/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java b/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java index 1a8d853220c..b1a715117c3 100644 --- a/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java +++ b/fcgi-http-client-transport/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpSenderOverFCGI.java @@ -1,11 +1,18 @@ package org.eclipse.jetty.fcgi.client.http; +import java.net.URI; + import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpContent; import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpSender; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.fcgi.FCGI; import org.eclipse.jetty.fcgi.generator.ClientGenerator; import org.eclipse.jetty.fcgi.generator.Generator; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.util.Callback; public class HttpSenderOverFCGI extends HttpSender @@ -27,9 +34,44 @@ public class HttpSenderOverFCGI extends HttpSender @Override protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback) { + Request httpRequest = exchange.getRequest(); + URI uri = httpRequest.getURI(); + HttpFields headers = httpRequest.getHeaders(); + + HttpField field = headers.remove(HttpHeader.AUTHORIZATION); + if (field != null) + headers.put(FCGI.Headers.AUTH_TYPE, field.getValue()); + + field = headers.remove(HttpHeader.CONTENT_LENGTH); + if (field != null) + headers.put(FCGI.Headers.CONTENT_LENGTH, field.getValue()); + + field = headers.remove(HttpHeader.CONTENT_TYPE); + if (field != null) + headers.put(FCGI.Headers.CONTENT_TYPE, field.getValue()); + + headers.put(FCGI.Headers.GATEWAY_INTERFACE, "CGI/1.1"); + +// headers.put(Headers.PATH_INFO, ???); +// headers.put(Headers.PATH_TRANSLATED, ???); + + headers.put(FCGI.Headers.QUERY_STRING, uri.getQuery()); + +// headers.put(Headers.REMOTE_ADDR, ???); +// headers.put(Headers.REMOTE_HOST, ???); +// headers.put(Headers.REMOTE_USER, ???); + + headers.put(FCGI.Headers.REQUEST_METHOD, httpRequest.getMethod()); + + headers.put(FCGI.Headers.REQUEST_URI, uri.toString()); + + headers.put(FCGI.Headers.SERVER_PROTOCOL, httpRequest.getVersion().asString()); + + // TODO: translate remaining HTTP header into the HTTP_* format + int request = getHttpChannel().getRequest(); boolean noContent = !content.hasContent(); - Generator.Result result = generator.generateRequestHeaders(request, exchange.getRequest().getHeaders(), + Generator.Result result = generator.generateRequestHeaders(request, headers, noContent ? new Callback.Adapter() : callback); getHttpChannel().flush(result); if (noContent) diff --git a/fcgi-http-client-transport/src/test/java/org/eclipse/jetty/fcgi/client/http/HttpClientTest.java b/fcgi-http-client-transport/src/test/java/org/eclipse/jetty/fcgi/client/http/HttpClientTest.java index b7562e3de8b..fd5b0b1a76d 100644 --- a/fcgi-http-client-transport/src/test/java/org/eclipse/jetty/fcgi/client/http/HttpClientTest.java +++ b/fcgi-http-client-transport/src/test/java/org/eclipse/jetty/fcgi/client/http/HttpClientTest.java @@ -39,6 +39,7 @@ import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.IO; import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.junit.Assert; import org.junit.Test; @@ -50,10 +51,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest { start(new EmptyServerHandler()); - Response response = client.GET(scheme + "://localhost:" + connector.getLocalPort()); - - Assert.assertNotNull(response); - Assert.assertEquals(200, response.getStatus()); + for (int i = 0; i < 2; ++i) + { + Response response = client.GET(scheme + "://localhost:" + connector.getLocalPort()); + Assert.assertNotNull(response); + Assert.assertEquals(200, response.getStatus()); + } } @Test @@ -70,12 +73,14 @@ public class HttpClientTest extends AbstractHttpClientServerTest } }); - ContentResponse response = client.GET(scheme + "://localhost:" + connector.getLocalPort()); - - Assert.assertNotNull(response); - Assert.assertEquals(200, response.getStatus()); - byte[] content = response.getContent(); - Assert.assertArrayEquals(data, content); + for (int i = 0; i < 2; ++i) + { + ContentResponse response = client.GET(scheme + "://localhost:" + connector.getLocalPort()); + Assert.assertNotNull(response); + Assert.assertEquals(200, response.getStatus()); + byte[] content = response.getContent(); + Assert.assertArrayEquals(data, content); + } } @Test @@ -226,7 +231,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest { response.setCharacterEncoding("UTF-8"); response.setContentType("application/octet-stream"); - response.getOutputStream().write(content); + IO.copy(request.getInputStream(), response.getOutputStream()); } } }); @@ -234,7 +239,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort() + "/?b=1") .param(paramName, paramValue) .content(new BytesContentProvider(content)) - .timeout(555, TimeUnit.SECONDS) + .timeout(5, TimeUnit.SECONDS) .send(); Assert.assertNotNull(response); diff --git a/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java b/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java index bbb858b1b02..1f9c060ef7e 100644 --- a/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java +++ b/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java @@ -23,7 +23,10 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.fcgi.FCGI; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.EndPoint; @@ -38,10 +41,8 @@ import org.eclipse.jetty.util.log.Logger; public class HttpChannelOverFCGI extends HttpChannel { private static final Logger LOG = Log.getLogger(HttpChannelOverFCGI.class); - private static final String METHOD_HEADER = "REQUEST_METHOD"; - private static final String URI_HEADER = "REQUEST_URI"; - private static final String VERSION_HEADER = "SERVER_PROTOCOL"; + private final Dispatcher dispatcher; private String method; private String uri; private String version; @@ -51,25 +52,24 @@ public class HttpChannelOverFCGI extends HttpChannel public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput input) { super(connector, configuration, endPoint, transport, input); + this.dispatcher = new Dispatcher(connector.getExecutor(), this); } - public void header(HttpField field) + protected void header(HttpField field) { - LOG.debug("FCGI header {}", field); - - if (METHOD_HEADER.equalsIgnoreCase(field.getName())) + if (FCGI.Headers.REQUEST_METHOD.equalsIgnoreCase(field.getName())) { method = field.getValue(); if (uri != null && version != null) startRequest(); } - else if (URI_HEADER.equalsIgnoreCase(field.getName())) + else if (FCGI.Headers.REQUEST_URI.equalsIgnoreCase(field.getName())) { uri = field.getValue(); if (method != null && version != null) startRequest(); } - else if (VERSION_HEADER.equalsIgnoreCase(field.getName())) + else if (FCGI.Headers.SERVER_PROTOCOL.equalsIgnoreCase(field.getName())) { version = field.getValue(); if (method != null && uri != null) @@ -126,12 +126,104 @@ public class HttpChannelOverFCGI extends HttpChannel } field = new HttpField(httpName.toString(), field.getValue()); } - LOG.debug("HTTP header {}", field); parsedHeader(field); } - public void dispatch() + @Override + public boolean headerComplete() { - getConnector().getExecutor().execute(this); + boolean result = super.headerComplete(); + started = false; + return result; + } + + protected void dispatch() + { + dispatcher.dispatch(); + } + + private static class Dispatcher implements Runnable + { + private final AtomicReference state = new AtomicReference<>(State.IDLE); + private final Executor executor; + private final Runnable runnable; + + private Dispatcher(Executor executor, Runnable runnable) + { + this.executor = executor; + this.runnable = runnable; + } + + public void dispatch() + { + while (true) + { + State current = state.get(); + switch (current) + { + case IDLE: + { + if (!state.compareAndSet(current, State.DISPATCH)) + continue; + executor.execute(this); + return; + } + case DISPATCH: + case EXECUTE: + { + if (state.compareAndSet(current, State.SCHEDULE)) + return; + continue; + } + case SCHEDULE: + { + return; + } + default: + { + throw new IllegalStateException(); + } + } + } + } + + @Override + public void run() + { + while (true) + { + State current = state.get(); + switch (current) + { + case DISPATCH: + { + if (state.compareAndSet(current, State.EXECUTE)) + runnable.run(); + continue; + } + case EXECUTE: + { + if (state.compareAndSet(current, State.IDLE)) + return; + continue; + } + case SCHEDULE: + { + if (state.compareAndSet(current, State.DISPATCH)) + continue; + throw new IllegalStateException(); + } + default: + { + throw new IllegalStateException(); + } + } + } + } + + private enum State + { + IDLE, DISPATCH, EXECUTE, SCHEDULE + } } } diff --git a/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java b/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java index 13783d253e5..4f1bba8ddfd 100644 --- a/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java +++ b/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java @@ -109,7 +109,7 @@ public class ServerFCGIConnection extends AbstractConnection private void shutdown() { - // TODO + flusher.shutdown(); } private class ServerListener implements ServerParser.Listener @@ -122,47 +122,57 @@ public class ServerFCGIConnection extends AbstractConnection HttpChannelOverFCGI existing = channels.putIfAbsent(request, channel); if (existing != null) throw new IllegalStateException(); + if (LOG.isDebugEnabled()) + LOG.debug("Request {} start on {}", request, channel); } @Override public void onHeader(int request, HttpField field) { HttpChannelOverFCGI channel = channels.get(request); + if (LOG.isDebugEnabled()) + LOG.debug("Request {} header {} on {}", request, field, channel); if (channel != null) channel.header(field); - else - noChannel(request); } @Override public void onHeaders(int request) { HttpChannelOverFCGI channel = channels.get(request); + if (LOG.isDebugEnabled()) + LOG.debug("Request {} headers on {}", request, channel); if (channel != null) - channel.headerComplete(); - else - noChannel(request); + { + if (channel.headerComplete()) + channel.dispatch(); + } } @Override public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { + HttpChannelOverFCGI channel = channels.get(request); + if (LOG.isDebugEnabled()) + LOG.debug("Request {} {} content {} on {}", request, stream, buffer, channel); + if (channel != null) + { + if (channel.content(buffer)) + channel.dispatch(); + } } @Override public void onEnd(int request) { - HttpChannelOverFCGI channel = channels.get(request); + HttpChannelOverFCGI channel = channels.remove(request); + if (LOG.isDebugEnabled()) + LOG.debug("Request {} end on {}", request, channel); if (channel != null) + { if (channel.messageComplete()) channel.dispatch(); - else - noChannel(request); - } - - private void noChannel(int request) - { - // TODO + } } } }