added direct buffer configuration

This commit is contained in:
Greg Wilkins 2015-02-05 10:02:01 +11:00
parent 96ebeed9d9
commit f6cfe07a69
17 changed files with 123 additions and 81 deletions

View File

@ -39,11 +39,14 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class GzipHandlerTest public class GzipHandlerTest
{ {
private static String __content = private static String __content =

View File

@ -48,6 +48,12 @@ public class HttpTransportOverFCGI implements HttpTransport
this.request = request; this.request = request;
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
@Override @Override
public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback) public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
{ {

View File

@ -54,6 +54,14 @@ public class HttpTransportOverHTTP2 implements HttpTransport
this.connection = connection; this.connection = connection;
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
// Because sent buffers are passed directly to the endpoint without
// copying we can defer to the endpoint
return connection.getEndPoint().isOptimizedForDirectBuffers();
}
public IStream getStream() public IStream getStream()
{ {
return stream; return stream;

View File

@ -91,6 +91,12 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
_connection = connection; _connection = connection;
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
@Override @Override
public void onOpen() public void onOpen()
{ {

View File

@ -55,6 +55,12 @@ public class ChannelEndPoint extends AbstractEndPoint
_gathering=_channel instanceof GatheringByteChannel?((GatheringByteChannel)_channel):null; _gathering=_channel instanceof GatheringByteChannel?((GatheringByteChannel)_channel):null;
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
return true;
}
@Override @Override
public boolean isOpen() public boolean isOpen()
{ {

View File

@ -237,5 +237,10 @@ public interface EndPoint extends Closeable
*/ */
void onClose(); void onClose();
/** Is the endpoint optimized for DirectBuffer usage
* @return True if direct buffers can be used optimally.
*/
boolean isOptimizedForDirectBuffers();
} }

View File

@ -85,7 +85,7 @@ public class SslConnection extends AbstractConnection
private ByteBuffer _decryptedInput; private ByteBuffer _decryptedInput;
private ByteBuffer _encryptedInput; private ByteBuffer _encryptedInput;
private ByteBuffer _encryptedOutput; private ByteBuffer _encryptedOutput;
private final boolean _encryptedDirectBuffers = false; private final boolean _encryptedDirectBuffers = true;
private final boolean _decryptedDirectBuffers = false; private final boolean _decryptedDirectBuffers = false;
private boolean _renegotiationAllowed; private boolean _renegotiationAllowed;
private final Runnable _runCompletWrite = new Runnable() private final Runnable _runCompletWrite = new Runnable()

View File

@ -186,6 +186,12 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
return _configuration; return _configuration;
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
return getHttpTransport().isOptimizedForDirectBuffers();
}
public Server getServer() public Server getServer()
{ {
return _connector.getServer(); return _connector.getServer();

View File

@ -152,6 +152,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return _generator; return _generator;
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
return getEndPoint().isOptimizedForDirectBuffers();
}
@Override @Override
public int getMessagesIn() public int getMessagesIn()
{ {

View File

@ -58,13 +58,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
void write(ByteBuffer content, boolean complete, Callback callback); void write(ByteBuffer content, boolean complete, Callback callback);
Interceptor getNextInterceptor(); Interceptor getNextInterceptor();
boolean isOptimizedForDirectBuffers();
} }
private static Logger LOG = Log.getLogger(HttpOutput.class); private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel _channel; private final HttpChannel _channel;
private final SharedBlockingCallback _writeBlock; private final SharedBlockingCallback _writeBlock;
private Interceptor _filter; private Interceptor _interceptor;
/** Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. */ /** Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. */
private long _written; private long _written;
@ -90,7 +91,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public HttpOutput(HttpChannel channel) public HttpOutput(HttpChannel channel)
{ {
_channel = channel; _channel = channel;
_filter = channel; _interceptor = channel;
_writeBlock = new SharedBlockingCallback() _writeBlock = new SharedBlockingCallback()
{ {
@Override @Override
@ -111,12 +112,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public Interceptor getInterceptor() public Interceptor getInterceptor()
{ {
return _filter; return _interceptor;
} }
public void setInterceptor(Interceptor filter) public void setInterceptor(Interceptor filter)
{ {
_filter=filter; _interceptor=filter;
} }
public boolean isWritten() public boolean isWritten()
@ -162,7 +163,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
protected void write(ByteBuffer content, boolean complete, Callback callback) protected void write(ByteBuffer content, boolean complete, Callback callback)
{ {
_filter.write(content, complete, callback); _interceptor.write(content, complete, callback);
} }
private void abort(Throwable failure) private void abort(Throwable failure)
@ -336,7 +337,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (!complete && len<=_commitSize) if (!complete && len<=_commitSize)
{ {
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
// YES - fill the aggregate with content from the buffer // YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len); int filled = BufferUtil.fill(_aggregate, b, off, len);
@ -381,7 +382,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (!complete && len<=_commitSize) if (!complete && len<=_commitSize)
{ {
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, false); _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
// YES - fill the aggregate with content from the buffer // YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len); int filled = BufferUtil.fill(_aggregate, b, off, len);
@ -509,7 +510,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
case OPEN: case OPEN:
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
BufferUtil.append(_aggregate, (byte)b); BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full // Check if all written or full
@ -529,7 +530,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
continue; continue;
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
BufferUtil.append(_aggregate, (byte)b); BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full // Check if all written or full
@ -795,7 +796,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void recycle() public void recycle()
{ {
resetBuffer(); resetBuffer();
_filter=_channel; _interceptor=_channel;
} }
public void resetBuffer() public void resetBuffer()

View File

@ -23,6 +23,10 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
/* ------------------------------------------------------------ */
/** Abstraction of the outbound HTTP transport.
*/
public interface HttpTransport public interface HttpTransport
{ {
void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback); void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback);
@ -47,4 +51,10 @@ public interface HttpTransport
* @param failure the failure that caused the abort. * @param failure the failure that caused the abort.
*/ */
void abort(Throwable failure); void abort(Throwable failure);
/* ------------------------------------------------------------ */
/** Is the underlying transport optimized for DirectBuffer usage
* @return True if direct buffers can be used optimally.
*/
boolean isOptimizedForDirectBuffers();
} }

View File

@ -232,6 +232,12 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
_remote=remote; _remote=remote;
_local=local; _local=local;
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
return _endp.isOptimizedForDirectBuffers();
}
public InetSocketAddress getLocalAddress() public InetSocketAddress getLocalAddress()
{ {

View File

@ -37,7 +37,6 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http.PathMap; import org.eclipse.jetty.http.PathMap;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpOutput; import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.HandlerWrapper; import org.eclipse.jetty.server.handler.HandlerWrapper;
@ -51,7 +50,7 @@ import org.eclipse.jetty.util.log.Logger;
/** /**
* A Handler that can dynamically GZIP compress responses. Unlike * A Handler that can dynamically GZIP compress responses. Unlike
* previous and 3rd party GzipFilters, this mechanism works with asynchronously * previous and 3rd party GzipFilters, this mechanism works with asynchronously
* generated responses and does not need to wrap the response or it's ownput * generated responses and does not need to wrap the response or it's output
* stream. Instead it uses the efficient {@link HttpOutput.Interceptor} mechanism. * stream. Instead it uses the efficient {@link HttpOutput.Interceptor} mechanism.
* <p> * <p>
* The handler can be applied to the entire server (a gzip.mod is included in * The handler can be applied to the entire server (a gzip.mod is included in

View File

@ -84,6 +84,13 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
return _interceptor; return _interceptor;
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
return false; // No point as deflator is in user space.
}
@Override @Override
public void write(ByteBuffer content, boolean complete, Callback callback) public void write(ByteBuffer content, boolean complete, Callback callback)
{ {
@ -133,12 +140,7 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
private void gzip(ByteBuffer content, boolean complete, final Callback callback) private void gzip(ByteBuffer content, boolean complete, final Callback callback)
{ {
if (content.hasRemaining() || complete) if (content.hasRemaining() || complete)
{ new GzipBufferCB(content,complete,callback).iterate();
if (content.hasArray())
new GzipArrayCB(content,complete,callback).iterate();
else
new GzipBufferCB(content,complete,callback).iterate();
}
else else
callback.succeeded(); callback.succeeded();
} }
@ -215,7 +217,6 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
fields.put(HttpHeader.ETAG,etag); fields.put(HttpHeader.ETAG,etag);
} }
LOG.debug("{} compressing {}",this,_deflater); LOG.debug("{} compressing {}",this,_deflater);
_state.set(GZState.COMPRESSING); _state.set(GZState.COMPRESSING);
@ -270,67 +271,15 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
{ {
return _state.get()==GZState.MIGHT_COMPRESS; return _state.get()==GZState.MIGHT_COMPRESS;
} }
private class GzipArrayCB extends IteratingNestedCallback
{
private final boolean _complete;
public GzipArrayCB(ByteBuffer content, boolean complete, Callback callback)
{
super(callback);
_complete=complete;
byte[] array=content.array();
int off=content.arrayOffset()+content.position();
int len=content.remaining();
_crc.update(array,off,len);
_deflater.setInput(array,off,len);
if (complete)
_deflater.finish();
content.position(content.limit());
}
@Override
protected Action process() throws Exception
{
if (_deflater.needsInput())
{
if (_deflater.finished())
{
_factory.recycle(_deflater);
_deflater=null;
_channel.getByteBufferPool().release(_buffer);
_buffer=null;
return Action.SUCCEEDED;
}
if (!_complete)
return Action.SUCCEEDED;
_deflater.finish();
}
BufferUtil.compact(_buffer);
int off=_buffer.arrayOffset()+_buffer.limit();
int len=_buffer.capacity()-_buffer.limit()- (_complete?8:0);
int produced=_deflater.deflate(_buffer.array(),off,len,Deflater.NO_FLUSH);
_buffer.limit(_buffer.limit()+produced);
boolean complete=_deflater.finished();
if (complete)
addTrailer();
_interceptor.write(_buffer,complete,this);
return Action.SCHEDULED;
}
}
private class GzipBufferCB extends IteratingNestedCallback private class GzipBufferCB extends IteratingNestedCallback
{ {
private final ByteBuffer _input; private ByteBuffer _copy;
private final ByteBuffer _content; private final ByteBuffer _content;
private final boolean _last; private final boolean _last;
public GzipBufferCB(ByteBuffer content, boolean complete, Callback callback) public GzipBufferCB(ByteBuffer content, boolean complete, Callback callback)
{ {
super(callback); super(callback);
_input=_channel.getByteBufferPool().acquire(Math.min(_bufferSize,content.remaining()),false);
_content=content; _content=content;
_last=complete; _last=complete;
} }
@ -348,6 +297,11 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
_deflater=null; _deflater=null;
_channel.getByteBufferPool().release(_buffer); _channel.getByteBufferPool().release(_buffer);
_buffer=null; _buffer=null;
if (_copy!=null)
{
_channel.getByteBufferPool().release(_copy);
_copy=null;
}
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
@ -358,17 +312,31 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
_deflater.finish(); _deflater.finish();
} }
else if (_content.hasArray())
{
byte[] array=_content.array();
int off=_content.arrayOffset()+_content.position();
int len=_content.remaining();
BufferUtil.clear(_content);
_crc.update(array,off,len);
_deflater.setInput(array,off,len);
if (_last)
_deflater.finish();
}
else else
{ {
BufferUtil.clearToFill(_input); if (_copy==null)
int took=BufferUtil.put(_content,_input); _copy=_channel.getByteBufferPool().acquire(_bufferSize,false);
BufferUtil.flipToFlush(_input,0); BufferUtil.clearToFill(_copy);
int took=BufferUtil.put(_content,_copy);
BufferUtil.flipToFlush(_copy,0);
if (took==0) if (took==0)
throw new IllegalStateException(); throw new IllegalStateException();
byte[] array=_input.array(); byte[] array=_copy.array();
int off=_input.arrayOffset()+_input.position(); int off=_copy.arrayOffset()+_copy.position();
int len=_input.remaining(); int len=_copy.remaining();
_crc.update(array,off,len); _crc.update(array,off,len);
_deflater.setInput(array,off,len); _deflater.setInput(array,off,len);

View File

@ -110,6 +110,11 @@ public class ResponseTest
{ {
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
}); });
} }

View File

@ -26,6 +26,7 @@ import java.net.Socket;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
@ -518,8 +519,6 @@ public class AsyncIOServletTest
in.setReadListener(new ReadListener() in.setReadListener(new ReadListener()
{ {
transient int _i=0;
@Override @Override
public void onError(Throwable t) public void onError(Throwable t)
{ {
@ -575,6 +574,8 @@ public class AsyncIOServletTest
{ {
OutputStream output = client.getOutputStream(); OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8")); output.write(request.getBytes("UTF-8"));
output.flush();
Thread.sleep(100);
output.write(data); output.write(data);
output.flush(); output.flush();

View File

@ -80,6 +80,12 @@ public class HttpTransportOverSPDY implements HttpTransport
this.version = session.getVersion(); this.version = session.getVersion();
} }
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
protected Stream getStream() protected Stream getStream()
{ {
return stream; return stream;