More work on generation/parsing.

This commit is contained in:
Simone Bordet 2013-09-03 16:04:13 +02:00
parent 54accb3699
commit f546d59ea1
6 changed files with 284 additions and 42 deletions

View File

@ -76,8 +76,7 @@ public class ClientGenerator extends Generator
int maxCapacity = 4 + 4 + 2 * MAX_PARAM_LENGTH;
// One FCGI_BEGIN_REQUEST + N FCGI_PARAMS + one last FCGI_PARAMS
int numberOfFrames = 1 + (fieldsLength / maxCapacity + 1) + 1;
Result result = new Result(byteBufferPool, callback, numberOfFrames);
Result result = new Result(byteBufferPool, callback);
ByteBuffer beginRequestBuffer = byteBufferPool.acquire(16, false);
BufferUtil.clearToFill(beginRequestBuffer);

View File

@ -19,6 +19,8 @@
package org.eclipse.jetty.fcgi.generator;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.io.ByteBufferPool;
@ -41,8 +43,7 @@ public class Generator
id &= 0xFF_FF;
int remaining = content == null ? 0 : content.remaining();
int frames = 2 * (remaining / MAX_CONTENT_LENGTH + 1) + (lastContent ? 1 : 0);
Result result = new Result(byteBufferPool, callback, frames);
Result result = new Result(byteBufferPool, callback);
while (remaining > 0 || lastContent)
{
@ -79,37 +80,34 @@ public class Generator
{
private final ByteBufferPool byteBufferPool;
private final Callback callback;
private final ByteBuffer[] buffers;
private final boolean[] recycles;
private int index;
private final List<ByteBuffer> buffers;
private final List<Boolean> recycles;
public Result(ByteBufferPool byteBufferPool, Callback callback, int frames)
public Result(ByteBufferPool byteBufferPool, Callback callback)
{
this(byteBufferPool, callback, new ByteBuffer[frames], new boolean[frames], 0);
this(byteBufferPool, callback, new ArrayList<ByteBuffer>(4), new ArrayList<Boolean>(4));
}
protected Result(Result that)
public Result(Result that)
{
this(that.byteBufferPool, that.callback, that.buffers, that.recycles, that.index);
this(that.byteBufferPool, that.callback, that.buffers, that.recycles);
}
private Result(ByteBufferPool byteBufferPool, Callback callback, ByteBuffer[] buffers, boolean[] recycles, int index)
private Result(ByteBufferPool byteBufferPool, Callback callback, List<ByteBuffer> buffers, List<Boolean> recycles)
{
this.byteBufferPool = byteBufferPool;
this.callback = callback;
this.buffers = buffers;
this.recycles = recycles;
this.index = index;
}
public void add(ByteBuffer buffer, boolean recycle)
{
buffers[index] = buffer;
recycles[index] = recycle;
++index;
buffers.add(buffer);
recycles.add(recycle);
}
public ByteBuffer[] getByteBuffers()
public List<ByteBuffer> getByteBuffers()
{
return buffers;
}
@ -130,10 +128,10 @@ public class Generator
protected void recycle()
{
for (int i = 0; i < buffers.length; ++i)
for (int i = 0; i < buffers.size(); ++i)
{
ByteBuffer buffer = buffers[i];
if (recycles[i])
ByteBuffer buffer = buffers.get(i);
if (recycles.get(i))
byteBufferPool.release(buffer);
}
}

View File

@ -94,4 +94,22 @@ public class ServerGenerator extends Generator
}
};
}
public Result generateResponseContent(int request, ByteBuffer content, boolean lastContent, Callback callback)
{
Result result = generateContent(request, content, lastContent, callback, FCGI.FrameType.STDOUT);
if (lastContent)
{
// Generate the FCGI_END_REQUEST
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);
endRequestBuffer.putLong(0x00L);
endRequestBuffer.flip();
result.add(endRequestBuffer, true);
}
return result;
}
}

View File

@ -28,7 +28,7 @@ public class ClientParser extends Parser
public ClientParser(Listener listener)
{
contentParsers.put(FCGI.FrameType.STDOUT, new ResponseContentParser(headerParser, FCGI.StreamType.STD_OUT, listener));
contentParsers.put(FCGI.FrameType.STDOUT, new ResponseContentParser(headerParser, listener));
contentParsers.put(FCGI.FrameType.STDERR, new StreamContentParser(headerParser, FCGI.StreamType.STD_ERR, listener));
contentParsers.put(FCGI.FrameType.END_REQUEST, new EndRequestContentParser(headerParser, listener));
}

View File

@ -27,9 +27,9 @@ import org.eclipse.jetty.http.HttpVersion;
public class ResponseContentParser extends StreamContentParser
{
public ResponseContentParser(HeaderParser headerParser, FCGI.StreamType streamType, Parser.Listener listener)
public ResponseContentParser(HeaderParser headerParser, Parser.Listener listener)
{
super(headerParser, streamType, new ResponseListener(headerParser, listener));
super(headerParser, FCGI.StreamType.STD_OUT, new ResponseListener(headerParser, listener));
}
private static class ResponseListener extends Parser.Listener.Adapter implements HttpParser.ResponseHandler<ByteBuffer>
@ -37,6 +37,7 @@ public class ResponseContentParser extends StreamContentParser
private final HeaderParser headerParser;
private final Parser.Listener listener;
private final FCGIHttpParser httpParser;
private State state = State.HEADERS;
public ResponseListener(HeaderParser headerParser, Parser.Listener listener)
{
@ -48,14 +49,41 @@ public class ResponseContentParser extends StreamContentParser
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
httpParser.parseHeaders(buffer);
while (buffer.hasRemaining())
{
switch (state)
{
case HEADERS:
{
if (httpParser.parseHeaders(buffer))
state = State.CONTENT;
break;
}
case CONTENT:
{
if (httpParser.parseContent(buffer))
reset();
break;
}
default:
{
throw new IllegalStateException();
}
}
}
}
private void reset()
{
httpParser.reset();
state = State.HEADERS;
}
@Override
public void onEnd(int request)
{
// TODO
throw new UnsupportedOperationException();
// Never called for STD_OUT, since it relies on FCGI_END_REQUEST
throw new IllegalStateException();
}
@Override
@ -68,6 +96,7 @@ public class ResponseContentParser extends StreamContentParser
@Override
public boolean startResponse(HttpVersion version, int status, String reason)
{
// The HTTP request line does not exist in FCGI responses
throw new IllegalStateException();
}
@ -76,6 +105,13 @@ public class ResponseContentParser extends StreamContentParser
{
try
{
if ("Status".equalsIgnoreCase(field.getName()))
{
// Need to set the response status so the
// HttpParser can handle the content properly.
int code = Integer.parseInt(field.getValue().split(" ")[0]);
httpParser.setResponseStatus(code);
}
listener.onHeader(headerParser.getRequest(), field.getName(), field.getValue());
}
catch (Throwable x)
@ -96,45 +132,81 @@ public class ResponseContentParser extends StreamContentParser
{
logger.debug("Exception while invoking listener " + listener, x);
}
return false;
// Return from parsing so that we can parse the content
return true;
}
@Override
public boolean content(ByteBuffer item)
public boolean content(ByteBuffer buffer)
{
try
{
listener.onContent(headerParser.getRequest(), FCGI.StreamType.STD_OUT, buffer);
}
catch (Throwable x)
{
logger.debug("Exception while invoking listener " + listener, x);
}
return false;
}
@Override
public boolean messageComplete()
{
return false;
// Return from parsing so that we can parse the next headers
return true;
}
@Override
public void earlyEOF()
{
// TODO
}
@Override
public void badMessage(int status, String reason)
{
}
}
// Methods overridden to make them visible here
private static class FCGIHttpParser extends HttpParser
{
private FCGIHttpParser(ResponseHandler<ByteBuffer> handler)
{
super(handler, 65 * 1024, true);
setState(State.HEADER);
// TODO
}
@Override
protected boolean parseHeaders(ByteBuffer buffer)
// Methods overridden to make them visible here
private static class FCGIHttpParser extends HttpParser
{
return super.parseHeaders(buffer);
private FCGIHttpParser(ResponseHandler<ByteBuffer> handler)
{
super(handler, 65 * 1024, true);
reset();
}
@Override
public void reset()
{
super.reset();
setState(State.HEADER);
}
@Override
protected boolean parseHeaders(ByteBuffer buffer)
{
return super.parseHeaders(buffer);
}
@Override
protected boolean parseContent(ByteBuffer buffer)
{
return super.parseContent(buffer);
}
@Override
protected void setResponseStatus(int status)
{
super.setResponseStatus(status);
}
}
private enum State
{
HEADERS, CONTENT
}
}
}

View File

@ -19,8 +19,10 @@
package org.eclipse.jetty.fcgi.parser;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.fcgi.generator.Generator;
import org.eclipse.jetty.fcgi.generator.ServerGenerator;
import org.eclipse.jetty.io.ByteBufferPool;
@ -91,4 +93,157 @@ public class ClientParserTest
Assert.assertEquals(value, params.get());
}
@Test
public void testParseNoResponseContent() throws Exception
{
final int id = 13;
Fields fields = new Fields();
final int code = 200;
final String contentTypeName = "Content-Length";
final String contentTypeValue = "0";
fields.put(contentTypeName, contentTypeValue);
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
ServerGenerator generator = new ServerGenerator(byteBufferPool);
Generator.Result result1 = generator.generateResponseHeaders(id, code, "OK", fields, null);
Generator.Result result2 = generator.generateResponseContent(id, null, true, null);
final AtomicInteger verifier = new AtomicInteger();
ClientParser parser = new ClientParser(new Parser.Listener.Adapter()
{
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
Assert.assertEquals(id, request);
verifier.addAndGet(2);
}
@Override
public void onEnd(int request)
{
Assert.assertEquals(id, request);
verifier.addAndGet(3);
}
});
for (ByteBuffer buffer : result1.getByteBuffers())
{
parser.parse(buffer);
Assert.assertFalse(buffer.hasRemaining());
}
for (ByteBuffer buffer : result2.getByteBuffers())
{
parser.parse(buffer);
Assert.assertFalse(buffer.hasRemaining());
}
Assert.assertEquals(3, verifier.get());
}
@Test
public void testParseSmallResponseContent() throws Exception
{
final int id = 13;
Fields fields = new Fields();
ByteBuffer content = ByteBuffer.wrap(new byte[1024]);
final int contentLength = content.remaining();
final int code = 200;
final String contentTypeName = "Content-Length";
final String contentTypeValue = String.valueOf(contentLength);
fields.put(contentTypeName, contentTypeValue);
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
ServerGenerator generator = new ServerGenerator(byteBufferPool);
Generator.Result result1 = generator.generateResponseHeaders(id, code, "OK", fields, null);
Generator.Result result2 = generator.generateResponseContent(id, content, true, null);
final AtomicInteger verifier = new AtomicInteger();
ClientParser parser = new ClientParser(new Parser.Listener.Adapter()
{
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
Assert.assertEquals(id, request);
Assert.assertEquals(contentLength, buffer.remaining());
verifier.addAndGet(2);
}
@Override
public void onEnd(int request)
{
Assert.assertEquals(id, request);
verifier.addAndGet(3);
}
});
for (ByteBuffer buffer : result1.getByteBuffers())
{
parser.parse(buffer);
Assert.assertFalse(buffer.hasRemaining());
}
for (ByteBuffer buffer : result2.getByteBuffers())
{
parser.parse(buffer);
Assert.assertFalse(buffer.hasRemaining());
}
Assert.assertEquals(5, verifier.get());
}
@Test
public void testParseLargeResponseContent() throws Exception
{
final int id = 13;
Fields fields = new Fields();
ByteBuffer content = ByteBuffer.wrap(new byte[128 * 1024]);
final int contentLength = content.remaining();
final int code = 200;
final String contentTypeName = "Content-Length";
final String contentTypeValue = String.valueOf(contentLength);
fields.put(contentTypeName, contentTypeValue);
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
ServerGenerator generator = new ServerGenerator(byteBufferPool);
Generator.Result result1 = generator.generateResponseHeaders(id, code, "OK", fields, null);
Generator.Result result2 = generator.generateResponseContent(id, content, true, null);
final AtomicInteger length = new AtomicInteger();
final AtomicBoolean verifier = new AtomicBoolean();
ClientParser parser = new ClientParser(new Parser.Listener.Adapter()
{
@Override
public void onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
Assert.assertEquals(id, request);
length.addAndGet(buffer.remaining());
}
@Override
public void onEnd(int request)
{
Assert.assertEquals(id, request);
Assert.assertEquals(contentLength, length.get());
verifier.set(true);
}
});
for (ByteBuffer buffer : result1.getByteBuffers())
{
parser.parse(buffer);
Assert.assertFalse(buffer.hasRemaining());
}
for (ByteBuffer buffer : result2.getByteBuffers())
{
parser.parse(buffer);
Assert.assertFalse(buffer.hasRemaining());
}
Assert.assertTrue(verifier.get());
}
}