Fixes #3952 - Server configuration for direct/heap ByteBuffers.

Updated server-side to use direct/heap ByteBuffers based on
getters and setters in the relevant components.
Made HTTP/1.1, HTTP/2, and WebSocket use the same mechanism.

Removed unused obsoleted methods:
* EndPoint.isOptimizedForDirectBuffers()
* HttpTransport.isOptimizedForDirectBuffers()
* HttpOutput.Interceptor.isOptimizedForDirectBuffers()
* HttpChannel.useDirectBuffers()

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-08-12 17:20:50 +02:00
parent 5dbccaed4e
commit 5ccaea3c74
34 changed files with 285 additions and 215 deletions

View File

@ -49,12 +49,6 @@ public class HttpTransportOverFCGI implements HttpTransport
this.request = request;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
@Override
public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
{

View File

@ -56,6 +56,8 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private final ISession session;
private final int bufferSize;
private final ExecutionStrategy strategy;
private boolean useInputDirectBuffers;
private boolean useOutputDirectBuffers;
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
@ -99,6 +101,26 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
producer.setInputBuffer(buffer);
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectBuffers)
{
this.useInputDirectBuffers = useInputDirectBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectBuffers)
{
this.useOutputDirectBuffers = useOutputDirectBuffers;
}
@Override
public void onOpen()
{
@ -389,7 +411,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
{
private NetworkBuffer()
{
super(byteBufferPool, bufferSize, false);
super(byteBufferPool, bufferSize, isUseInputDirectByteBuffers());
}
private void put(ByteBuffer source)

View File

@ -44,4 +44,9 @@ public abstract class FrameGenerator
{
return headerGenerator.getMaxFrameSize();
}
public boolean isUseDirectByteBuffers()
{
return headerGenerator.isUseDirectByteBuffers();
}
}

View File

@ -38,10 +38,15 @@ public class Generator
}
public Generator(ByteBufferPool byteBufferPool, int maxDynamicTableSize, int maxHeaderBlockFragment)
{
this(byteBufferPool, true, maxDynamicTableSize, maxHeaderBlockFragment);
}
public Generator(ByteBufferPool byteBufferPool, boolean directBuffers, int maxDynamicTableSize, int maxHeaderBlockFragment)
{
this.byteBufferPool = byteBufferPool;
headerGenerator = new HeaderGenerator();
headerGenerator = new HeaderGenerator(directBuffers);
hpackEncoder = new HpackEncoder(maxDynamicTableSize);
this.generators = new FrameGenerator[FrameType.values().length];

View File

@ -27,10 +27,26 @@ import org.eclipse.jetty.io.ByteBufferPool;
public class HeaderGenerator
{
private int maxFrameSize = Frame.DEFAULT_MAX_LENGTH;
private final boolean directBuffers;
public HeaderGenerator()
{
this(true);
}
public HeaderGenerator(boolean directBuffers)
{
this.directBuffers = directBuffers;
}
public boolean isUseDirectByteBuffers()
{
return directBuffers;
}
public ByteBuffer generate(ByteBufferPool.Lease lease, FrameType frameType, int capacity, int length, int flags, int streamId)
{
ByteBuffer header = lease.acquire(capacity, true);
ByteBuffer header = lease.acquire(capacity, isUseDirectByteBuffers());
header.put((byte)((length & 0x00_FF_00_00) >>> 16));
header.put((byte)((length & 0x00_00_FF_00) >>> 8));
header.put((byte)((length & 0x00_00_00_FF)));

View File

@ -67,7 +67,7 @@ public class HeadersGenerator extends FrameGenerator
flags = Flags.PRIORITY;
int maxFrameSize = getMaxFrameSize();
ByteBuffer hpacked = lease.acquire(maxFrameSize, false);
ByteBuffer hpacked = lease.acquire(maxFrameSize, isUseDirectByteBuffers());
BufferUtil.clearToFill(hpacked);
encoder.encode(hpacked, metaData);
int hpackedLength = hpacked.position();

View File

@ -58,7 +58,7 @@ public class PushPromiseGenerator extends FrameGenerator
int extraSpace = 4;
maxFrameSize -= extraSpace;
ByteBuffer hpacked = lease.acquire(maxFrameSize, false);
ByteBuffer hpacked = lease.acquire(maxFrameSize, isUseDirectByteBuffers());
BufferUtil.clearToFill(hpacked);
encoder.encode(hpacked, metaData);
int hpackedLength = hpacked.position();

View File

@ -24,7 +24,7 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.hpack.HpackContext.Entry;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -76,13 +76,8 @@ public class HpackDecoder
while (buffer.hasRemaining())
{
if (LOG.isDebugEnabled() && buffer.hasArray())
{
int l = Math.min(buffer.remaining(), 32);
LOG.debug("decode {}{}",
TypeUtil.toHexString(buffer.array(), buffer.arrayOffset() + buffer.position(), l),
l < buffer.remaining() ? "..." : "");
}
if (LOG.isDebugEnabled())
LOG.debug("decode {}", BufferUtil.toHexString(buffer));
byte b = buffer.get();
if (b < 0)
@ -258,14 +253,9 @@ public class HpackDecoder
public static String toASCIIString(ByteBuffer buffer, int length)
{
StringBuilder builder = new StringBuilder(length);
int position = buffer.position();
int start = buffer.arrayOffset() + position;
int end = start + length;
buffer.position(position + length);
byte[] array = buffer.array();
for (int i = start; i < end; i++)
for (int i = 0; i < length; ++i)
{
builder.append((char)(0x7f & array[i]));
builder.append((char)(0x7F & buffer.get()));
}
return builder.toString();
}

View File

@ -34,9 +34,9 @@ import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http2.hpack.HpackContext.Entry;
import org.eclipse.jetty.http2.hpack.HpackContext.StaticEntry;
import org.eclipse.jetty.util.ArrayTrie;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Trie;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -354,9 +354,8 @@ public class HpackEncoder
if (_debug)
{
int e = buffer.position();
if (LOG.isDebugEnabled())
LOG.debug("encode {}:'{}' to '{}'", encoding, field, TypeUtil.toHexString(buffer.array(), buffer.arrayOffset() + p, e - p));
LOG.debug("encode {}:'{}' to '{}'", encoding, field, BufferUtil.toHexString(buffer));
}
}

View File

@ -60,6 +60,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
private boolean _useInputDirectBuffers;
private boolean _useOutputDirectBuffers;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
@ -78,6 +80,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.httpConfiguration = Objects.requireNonNull(httpConfiguration);
addBean(httpConfiguration);
setInputBufferSize(Frame.DEFAULT_MAX_LENGTH + Frame.HEADER_LENGTH);
setUseInputDirectByteBuffers(httpConfiguration.isUseInputDirectByteBuffers());
setUseOutputDirectByteBuffers(httpConfiguration.isUseOutputDirectByteBuffers());
}
@ManagedAttribute("The HPACK dynamic table maximum size")
@ -178,6 +182,26 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.maxSettingsKeys = maxSettingsKeys;
}
public boolean isUseInputDirectByteBuffers()
{
return _useInputDirectBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectBuffers)
{
_useInputDirectBuffers = useInputDirectBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return _useOutputDirectBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectBuffers)
{
_useOutputDirectBuffers = useOutputDirectBuffers;
}
public HttpConfiguration getHttpConfiguration()
{
return httpConfiguration;
@ -200,7 +224,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
{
ServerSessionListener listener = newSessionListener(connector, endPoint);
Generator generator = new Generator(connector.getByteBufferPool(), getMaxDynamicTableSize(), getMaxHeaderBlockFragment());
Generator generator = new Generator(connector.getByteBufferPool(), isUseOutputDirectByteBuffers(), getMaxDynamicTableSize(), getMaxHeaderBlockFragment());
FlowControlStrategy flowControl = getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ServerSession session = new HTTP2ServerSession(connector.getScheduler(), endPoint, generator, listener, flowControl);
session.setMaxLocalStreams(getMaxConcurrentStreams());
@ -222,6 +246,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
connection.addListener(sessionContainer);
return configure(connection, connector, endPoint);
}

View File

@ -287,6 +287,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, this);
transport.setStream(stream);
channel = newServerHttpChannelOverHTTP2(connector, httpConfig, transport);
channel.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("Creating channel {} for {}", channel, this);
}

View File

@ -55,6 +55,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
private boolean _expect100Continue;
private boolean _delayedUntilContent;
private boolean _useOutputDirectBuffers;
public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
{
@ -66,6 +67,17 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
return getHttpTransport().getStream();
}
@Override
public boolean isUseOutputDirectByteBuffers()
{
return _useOutputDirectBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectBuffers)
{
_useOutputDirectBuffers = useOutputDirectBuffers;
}
@Override
public boolean isExpecting100Continue()
{

View File

@ -58,14 +58,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
this.connection = connection;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
// Because sent buffers are passed directly to the endpoint without
// copying we can defer to the endpoint
return connection.getEndPoint().isOptimizedForDirectBuffers();
}
public IStream getStream()
{
return stream;

View File

@ -327,12 +327,6 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
_connection = connection;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
protected void reset()
{
_state.set(State.OPEN);

View File

@ -175,12 +175,6 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
_gather = (channel instanceof GatheringByteChannel) ? (GatheringByteChannel)channel : null;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return true;
}
@Override
public boolean isOpen()
{

View File

@ -269,13 +269,6 @@ public interface EndPoint extends Closeable
*/
void onClose(Throwable cause);
/**
* Is the endpoint optimized for DirectBuffer usage
*
* @return True if direct buffers can be used optimally.
*/
boolean isOptimizedForDirectBuffers();
/**
* Upgrade connections.
* Close the old connection, update the endpoint and open the new connection.

View File

@ -44,7 +44,6 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.QuietException;
@ -228,12 +227,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
return _configuration;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return getHttpTransport().isOptimizedForDirectBuffers();
}
public Server getServer()
{
return _connector.getServer();
@ -952,12 +945,9 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
return _connector.getScheduler();
}
/**
* @return true if the HttpChannel can efficiently use direct buffer (typically this means it is not over SSL or a multiplexed protocol)
*/
public boolean useDirectBuffers()
public boolean isUseOutputDirectByteBuffers()
{
return getEndPoint() instanceof ChannelEndPoint;
return getHttpConfiguration().isUseOutputDirectByteBuffers();
}
/**

View File

@ -69,6 +69,12 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
_metadata.setURI(new HttpURI());
}
@Override
public boolean isUseOutputDirectByteBuffers()
{
return _httpConnection.isUseOutputDirectByteBuffers();
}
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{

View File

@ -34,8 +34,6 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* HTTP Configuration.
@ -51,8 +49,6 @@ import org.eclipse.jetty.util.log.Logger;
@ManagedObject("HTTP Configuration")
public class HttpConfiguration implements Dumpable
{
private static final Logger LOG = Log.getLogger(HttpConfiguration.class);
public static final String SERVER_VERSION = "Jetty(" + Jetty.VERSION + ")";
private final List<Customizer> _customizers = new CopyOnWriteArrayList<>();
private final Trie<Boolean> _formEncodedMethods = new TreeTrie<>();
@ -71,7 +67,8 @@ public class HttpConfiguration implements Dumpable
private boolean _delayDispatchUntilContent = true;
private boolean _persistentConnectionsEnabled = true;
private int _maxErrorDispatches = 10;
private boolean _useDirectByteBuffers = false;
private boolean _useInputDirectByteBuffers = true;
private boolean _useOutputDirectByteBuffers = true;
private long _minRequestDataRate;
private long _minResponseDataRate;
private HttpCompliance _httpCompliance = HttpCompliance.RFC7230;
@ -128,6 +125,7 @@ public class HttpConfiguration implements Dumpable
_requestHeaderSize = config._requestHeaderSize;
_responseHeaderSize = config._responseHeaderSize;
_headerCacheSize = config._headerCacheSize;
_headerCacheCaseSensitive = config._headerCacheCaseSensitive;
_secureScheme = config._secureScheme;
_securePort = config._securePort;
_idleTimeout = config._idleTimeout;
@ -137,9 +135,11 @@ public class HttpConfiguration implements Dumpable
_delayDispatchUntilContent = config._delayDispatchUntilContent;
_persistentConnectionsEnabled = config._persistentConnectionsEnabled;
_maxErrorDispatches = config._maxErrorDispatches;
_useDirectByteBuffers = config._useDirectByteBuffers;
_useInputDirectByteBuffers = config._useInputDirectByteBuffers;
_useOutputDirectByteBuffers = config._useOutputDirectByteBuffers;
_minRequestDataRate = config._minRequestDataRate;
_minResponseDataRate = config._minResponseDataRate;
_httpCompliance = config._httpCompliance;
_requestCookieCompliance = config._requestCookieCompliance;
_responseCookieCompliance = config._responseCookieCompliance;
_notifyRemoteAsyncErrors = config._notifyRemoteAsyncErrors;
@ -329,17 +329,31 @@ public class HttpConfiguration implements Dumpable
}
/**
* @param useDirectByteBuffers if true, use direct byte buffers for requests
* @param useInputDirectByteBuffers whether to use direct ByteBuffers for reading
*/
public void setUseDirectByteBuffers(boolean useDirectByteBuffers)
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
_useDirectByteBuffers = useDirectByteBuffers;
_useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct byte buffers for requests")
public boolean isUseDirectByteBuffers()
@ManagedAttribute("Whether to use direct ByteBuffers for reading")
public boolean isUseInputDirectByteBuffers()
{
return _useDirectByteBuffers;
return _useInputDirectByteBuffers;
}
/**
* @param useOutputDirectByteBuffers whether to use direct ByteBuffers for writing
*/
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
_useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct ByteBuffers for writing")
public boolean isUseOutputDirectByteBuffers()
{
return _useOutputDirectByteBuffers;
}
/**
@ -557,7 +571,7 @@ public class HttpConfiguration implements Dumpable
}
/**
* @return The CookieCompliance used for parsing request <code>Cookie</code> headers.
* @return The CookieCompliance used for parsing request {@code Cookie} headers.
* @see #getResponseCookieCompliance()
*/
public CookieCompliance getRequestCookieCompliance()
@ -566,7 +580,7 @@ public class HttpConfiguration implements Dumpable
}
/**
* @return The CookieCompliance used for generating response <code>Set-Cookie</code> headers
* @return The CookieCompliance used for generating response {@code Set-Cookie} headers
* @see #getRequestCookieCompliance()
*/
public CookieCompliance getResponseCookieCompliance()
@ -575,8 +589,7 @@ public class HttpConfiguration implements Dumpable
}
/**
* @param cookieCompliance The CookieCompliance to use for parsing request <code>Cookie</code> headers.
* @see #setRequestCookieCompliance(CookieCompliance)
* @param cookieCompliance The CookieCompliance to use for parsing request {@code Cookie} headers.
*/
public void setRequestCookieCompliance(CookieCompliance cookieCompliance)
{
@ -584,8 +597,7 @@ public class HttpConfiguration implements Dumpable
}
/**
* @param cookieCompliance The CookieCompliance to use for generating response <code>Set-Cookie</code> headers
* @see #setResponseCookieCompliance(CookieCompliance)
* @param cookieCompliance The CookieCompliance to use for generating response {@code Set-Cookie} headers
*/
public void setResponseCookieCompliance(CookieCompliance cookieCompliance)
{

View File

@ -73,6 +73,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final boolean _recordHttpComplianceViolations;
private final LongAdder bytesIn = new LongAdder();
private final LongAdder bytesOut = new LongAdder();
private boolean _useInputDirectBuffers;
private boolean _useOutputDirectBuffers;
/**
* Get the current connection that this thread is dispatched to.
@ -164,12 +166,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return _generator;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return getEndPoint().isOptimizedForDirectBuffers();
}
@Override
public long getMessagesIn()
{
@ -182,6 +178,26 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return getHttpChannel().getRequests();
}
public boolean isUseInputDirectByteBuffers()
{
return _useInputDirectBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectBuffers)
{
_useInputDirectBuffers = useInputDirectBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return _useOutputDirectBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectBuffers)
{
_useOutputDirectBuffers = useOutputDirectBuffers;
}
@Override
public ByteBuffer onUpgradeFrom()
{
@ -224,7 +240,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public ByteBuffer getRequestBuffer()
{
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), _config.isUseDirectByteBuffers());
{
boolean useDirectBuffers = isUseInputDirectByteBuffers();
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), useDirectBuffers);
}
return _requestBuffer;
}
@ -737,6 +756,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (_callback == null)
throw new IllegalStateException();
boolean useDirectBuffers = isUseOutputDirectByteBuffers();
ByteBuffer chunk = _chunk;
while (true)
{
@ -757,19 +777,19 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
case NEED_HEADER:
{
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), _config.isUseDirectByteBuffers());
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), useDirectBuffers);
continue;
}
case NEED_CHUNK:
{
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, _config.isUseDirectByteBuffers());
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, useDirectBuffers);
continue;
}
case NEED_CHUNK_TRAILER:
{
if (_chunk != null)
_bufferPool.release(_chunk);
chunk = _chunk = _bufferPool.acquire(_config.getResponseHeaderSize(), _config.isUseDirectByteBuffers());
chunk = _chunk = _bufferPool.acquire(_config.getResponseHeaderSize(), useDirectBuffers);
continue;
}
case FLUSH:

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.server;
import java.util.Objects;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
@ -32,7 +34,9 @@ import org.eclipse.jetty.util.annotation.Name;
public class HttpConnectionFactory extends AbstractConnectionFactory implements HttpConfiguration.ConnectionFactory
{
private final HttpConfiguration _config;
private boolean _recordHttpComplianceViolations = false;
private boolean _recordHttpComplianceViolations;
private boolean _useInputDirectBuffers;
private boolean _useOutputDirectBuffers;
public HttpConnectionFactory()
{
@ -42,10 +46,10 @@ public class HttpConnectionFactory extends AbstractConnectionFactory implements
public HttpConnectionFactory(@Name("config") HttpConfiguration config)
{
super(HttpVersion.HTTP_1_1.asString());
_config = config;
if (config == null)
throw new IllegalArgumentException("Null HttpConfiguration");
_config = Objects.requireNonNull(config);
addBean(_config);
setUseInputDirectByteBuffers(_config.isUseInputDirectByteBuffers());
setUseOutputDirectByteBuffers(_config.isUseOutputDirectByteBuffers());
}
@Override
@ -59,15 +63,37 @@ public class HttpConnectionFactory extends AbstractConnectionFactory implements
return _recordHttpComplianceViolations;
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
HttpConnection conn = new HttpConnection(_config, connector, endPoint, isRecordHttpComplianceViolations());
return configure(conn, connector, endPoint);
}
public void setRecordHttpComplianceViolations(boolean recordHttpComplianceViolations)
{
this._recordHttpComplianceViolations = recordHttpComplianceViolations;
}
public boolean isUseInputDirectByteBuffers()
{
return _useInputDirectBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectBuffers)
{
_useInputDirectBuffers = useInputDirectBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return _useOutputDirectBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectBuffers)
{
_useOutputDirectBuffers = useOutputDirectBuffers;
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
HttpConnection connection = new HttpConnection(_config, connector, endPoint, isRecordHttpComplianceViolations());
connection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
return configure(connection, connector, endPoint);
}
}

View File

@ -103,14 +103,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
Interceptor getNextInterceptor();
/**
* @return True if the Interceptor is optimized to receive direct
* {@link ByteBuffer}s in the {@link #write(ByteBuffer, boolean, Callback)}
* method. If false is returned, then passing direct buffers may cause
* inefficiencies.
*/
boolean isOptimizedForDirectBuffers();
/**
* Reset the buffers.
* <p>If the Interceptor contains buffers then reset them.
@ -458,8 +450,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
boolean last = isLastContentToWrite(len);
if (!last && len <= _commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
ensureAggregate();
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
@ -500,12 +491,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// handle blocking write
// Should we aggregate?
int capacity = getBufferSize();
boolean last = isLastContentToWrite(len);
if (!last && len <= _commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
ensureAggregate();
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
@ -559,6 +548,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
closed();
}
private void ensureAggregate()
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.isUseOutputDirectByteBuffers());
}
public void write(ByteBuffer buffer) throws IOException
{
// This write always bypasses aggregate buffer
@ -630,8 +625,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
switch (_state.get())
{
case OPEN:
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
ensureAggregate();
BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full
@ -650,8 +644,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
ensureAggregate();
BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full
@ -984,7 +977,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break;
}
ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
ByteBuffer buffer = _channel.isUseOutputDirectByteBuffers() ? httpContent.getDirectBuffer() : null;
if (buffer == null)
buffer = httpContent.getIndirectBuffer();
@ -1406,6 +1399,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
super(callback);
_in = in;
// Reading from InputStream requires byte[], don't use direct buffers.
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
}
@ -1458,7 +1452,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
* An iterating callback that will take content from a
* ReadableByteChannel and write it to the {@link HttpChannel}.
* A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
* {@link HttpChannel#useDirectBuffers()} is true.
* {@link HttpChannel#isUseOutputDirectByteBuffers()} is true.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
* be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called.
@ -1473,7 +1467,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
super(callback);
_in = in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.isUseOutputDirectByteBuffers());
}
@Override

View File

@ -71,11 +71,4 @@ public interface HttpTransport
* @param failure the failure that caused the abort.
*/
void abort(Throwable failure);
/**
* Is the underlying transport optimized for DirectBuffer usage
*
* @return True if direct buffers can be used optimally.
*/
boolean isOptimizedForDirectBuffers();
}

View File

@ -601,12 +601,6 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
_local = local;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return _endp.isOptimizedForDirectBuffers();
}
@Override
public InetSocketAddress getLocalAddress()
{

View File

@ -271,12 +271,6 @@ public class BufferedResponseHandler extends HandlerWrapper
return _next;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
protected void commit(Queue<ByteBuffer> buffers, Callback callback)
{
// If only 1 buffer

View File

@ -93,12 +93,6 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
return _interceptor;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return false; // No point as deflator is in user space.
}
@Override
public void write(ByteBuffer content, boolean complete, Callback callback)
{

View File

@ -651,12 +651,6 @@ public class HttpOutputTest
_next.write(BufferUtil.toBuffer(s), complete, callback);
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return _next.isOptimizedForDirectBuffers();
}
@Override
public Interceptor getNextInterceptor()
{

View File

@ -173,12 +173,6 @@ public class ResponseTest
{
_channelError = failure;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
});
}

View File

@ -50,6 +50,8 @@ public class FrameFlusher extends IteratingCallback
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
private static final Throwable CLOSED_CHANNEL = new ClosedChannelException();
private final LongAdder messagesOut = new LongAdder();
private final LongAdder bytesOut = new LongAdder();
private final ByteBufferPool bufferPool;
private final EndPoint endPoint;
private final int bufferSize;
@ -62,13 +64,12 @@ public class FrameFlusher extends IteratingCallback
private final List<Entry> previousEntries;
private final List<Entry> failedEntries;
private ByteBuffer batchBuffer = null;
private ByteBuffer batchBuffer;
private boolean canEnqueue = true;
private boolean flushed = true;
private Throwable closedCause;
private LongAdder messagesOut = new LongAdder();
private LongAdder bytesOut = new LongAdder();
private long idleTimeout = 0;
private long idleTimeout;
private boolean useDirectBuffers;
public FrameFlusher(ByteBufferPool bufferPool, Scheduler scheduler, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
{
@ -84,6 +85,16 @@ public class FrameFlusher extends IteratingCallback
this.timeoutScheduler = scheduler;
}
public boolean isUseDirectByteBuffers()
{
return useDirectBuffers;
}
public void setUseDirectByteBuffers(boolean useDirectBuffers)
{
this.useDirectBuffers = useDirectBuffers;
}
/**
* Enqueue a Frame to be written to the endpoint.
*
@ -225,7 +236,7 @@ public class FrameFlusher extends IteratingCallback
// Acquire a batchBuffer if we don't have one
if (batchBuffer == null)
{
batchBuffer = bufferPool.acquire(bufferSize, true);
batchBuffer = acquireBuffer(bufferSize);
buffers.add(batchBuffer);
}
@ -249,7 +260,10 @@ public class FrameFlusher extends IteratingCallback
else
{
// Add headers and payload to the list of buffers
buffers.add(entry.generateHeaderBytes());
// TODO: release this buffer.
ByteBuffer buffer = acquireBuffer(Generator.MAX_HEADER_LENGTH);
buffers.add(buffer);
entry.generateHeaderBytes(buffer);
flush = true;
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
@ -308,6 +322,11 @@ public class FrameFlusher extends IteratingCallback
return Action.SCHEDULED;
}
private ByteBuffer acquireBuffer(int capacity)
{
return bufferPool.acquire(capacity, isUseDirectByteBuffers());
}
private int getQueueSize()
{
synchronized (this)
@ -474,11 +493,6 @@ public class FrameFlusher extends IteratingCallback
super(frame, callback, batch);
}
private ByteBuffer generateHeaderBytes()
{
return headerBuffer = generator.generateHeaderBytes(frame);
}
private void generateHeaderBytes(ByteBuffer buffer)
{
int pos = BufferUtil.flipToFill(buffer);

View File

@ -68,6 +68,8 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
// Read / Parse variables
private RetainableByteBuffer networkBuffer;
private boolean useInputDirectBuffers;
private boolean useOutputDirectBuffers;
/**
* Create a WSConnection.
@ -132,6 +134,26 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
return getEndPoint().getRemoteAddress();
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectBuffers)
{
this.useInputDirectBuffers = useInputDirectBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectBuffers)
{
this.useOutputDirectBuffers = useOutputDirectBuffers;
}
/**
* Physical connection disconnect.
* <p>
@ -222,7 +244,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
synchronized (this)
{
if (networkBuffer == null)
networkBuffer = new RetainableByteBuffer(bufferPool, getInputBufferSize());
networkBuffer = newNetworkBuffer(getInputBufferSize());
}
}
@ -237,10 +259,15 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
throw new IllegalStateException();
networkBuffer.release();
networkBuffer = new RetainableByteBuffer(bufferPool, getInputBufferSize());
networkBuffer = newNetworkBuffer(getInputBufferSize());
}
}
private RetainableByteBuffer newNetworkBuffer(int capacity)
{
return new RetainableByteBuffer(bufferPool, capacity, isUseInputDirectByteBuffers());
}
private void releaseNetworkBuffer()
{
synchronized (this)
@ -445,7 +472,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
{
synchronized (this)
{
networkBuffer = new RetainableByteBuffer(bufferPool, prefilled.remaining());
networkBuffer = newNetworkBuffer(prefilled.remaining());
}
ByteBuffer buffer = networkBuffer.getBuffer();
BufferUtil.clearToFill(buffer);
@ -572,6 +599,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
private Flusher(Scheduler scheduler, int bufferSize, Generator generator, EndPoint endpoint)
{
super(bufferPool, scheduler, generator, endpoint, bufferSize, 8);
setUseDirectByteBuffers(isUseOutputDirectByteBuffers());
}
@Override

View File

@ -201,6 +201,11 @@ public final class RFC6455Handshaker implements Handshaker
// Create a connection
WebSocketConnection connection = newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
// TODO: perhaps use of direct buffers should be WebSocket specific
// rather than inheriting the setting from HttpConfiguration.
HttpConfiguration httpConfig = httpChannel.getHttpConfiguration();
connection.setUseInputDirectByteBuffers(httpConfig.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(httpChannel.isUseOutputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("connection {}", connection);
if (connection == null)

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
@ -88,7 +89,7 @@ public class ValidationExtensionTest extends WebSocketTester
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.BINARY));
assertThat(frame.getPayload().array(), is(nonUtf8Payload));
assertThat(BufferUtil.toArray(frame.getPayload()), is(nonUtf8Payload));
//close normally
client.getOutputStream().write(RawFrameBuilder.buildClose(CloseStatus.NORMAL_STATUS, true));
@ -113,13 +114,13 @@ public class ValidationExtensionTest extends WebSocketTester
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayload().array(), is(initialPayload));
assertThat(BufferUtil.toArray(frame.getPayload()), is(initialPayload));
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.CONTINUATION, continuationPayload, true));
frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.CONTINUATION));
assertThat(frame.getPayload().array(), is(continuationPayload));
assertThat(BufferUtil.toArray(frame.getPayload()), is(continuationPayload));
//close normally
client.getOutputStream().write(RawFrameBuilder.buildClose(CloseStatus.NORMAL_STATUS, true));
@ -144,7 +145,7 @@ public class ValidationExtensionTest extends WebSocketTester
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayload().array(), is(initialPayload));
assertThat(BufferUtil.toArray(frame.getPayload()), is(initialPayload));
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.CONTINUATION, incompleteContinuationPayload, true));
frame = receiveFrame(client.getInputStream());

View File

@ -164,12 +164,6 @@ public class MockEndpoint implements EndPoint
throw new UnsupportedOperationException(NOT_SUPPORTED);
}
@Override
public boolean isOptimizedForDirectBuffers()
{
throw new UnsupportedOperationException(NOT_SUPPORTED);
}
@Override
public void upgrade(Connection newConnection)
{

View File

@ -24,7 +24,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -43,8 +42,6 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -207,29 +204,6 @@ public class WebSocketServerTest extends WebSocketTester
}
assertThat(serverHandler.receivedFrames.size(), is(5));
assertThat(receivedCallbacks.size(), is(5));
byte[] first = serverHandler.receivedFrames.poll().getPayload().array();
assertThat(serverHandler.receivedFrames.poll().getPayload().array(), sameInstance(first));
assertThat(serverHandler.receivedFrames.poll().getPayload().array(), sameInstance(first));
byte[] second = serverHandler.receivedFrames.poll().getPayload().array();
assertThat(serverHandler.receivedFrames.poll().getPayload().array(), sameInstance(second));
assertThat(first, not(sameInstance(second)));
ByteBufferPool pool = server.getServer().getConnectors()[0].getByteBufferPool();
assertThat(pool.acquire(first.length, false).array(), not(sameInstance(first)));
receivedCallbacks.poll().succeeded();
assertThat(pool.acquire(first.length, false).array(), not(sameInstance(first)));
receivedCallbacks.poll().succeeded();
assertThat(pool.acquire(first.length, false).array(), not(sameInstance(first)));
receivedCallbacks.poll().succeeded();
assertThat(pool.acquire(first.length, false).array(), sameInstance(first));
assertThat(pool.acquire(second.length, false).array(), not(sameInstance(second)));
receivedCallbacks.poll().succeeded();
assertThat(pool.acquire(second.length, false).array(), not(sameInstance(second)));
receivedCallbacks.poll().succeeded();
assertThat(pool.acquire(second.length, false).array(), sameInstance(second));
}
}