Fixes #3953 - Client configuration for direct/heap ByteBuffers.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-02-05 09:49:44 +01:00
parent f8219a56cc
commit 4fbbaf8b8e
33 changed files with 307 additions and 116 deletions

View File

@ -150,6 +150,8 @@ public class HttpClient extends ContainerLifeCycle
private String name = getClass().getSimpleName() + "@" + Integer.toHexString(hashCode());
private HttpCompliance httpCompliance = HttpCompliance.RFC7230;
private String defaultRequestContentType = "application/octet-stream";
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
/**
* Creates a HttpClient instance that can perform HTTP/1.1 requests to non-TLS and TLS destinations.
@ -1063,6 +1065,40 @@ public class HttpClient extends ContainerLifeCycle
this.defaultRequestContentType = contentType;
}
/**
* @return whether to use direct ByteBuffers for reading
*/
@ManagedAttribute("Whether to use direct ByteBuffers for reading")
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
/**
* @param useInputDirectByteBuffers whether to use direct ByteBuffers for reading
*/
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
/**
* @return whether to use direct ByteBuffers for writing
*/
@ManagedAttribute("Whether to use direct ByteBuffers for writing")
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
/**
* @param useOutputDirectByteBuffers whether to use direct ByteBuffers for writing
*/
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
/**
* @return the forward proxy configuration
*/

View File

@ -21,12 +21,8 @@ package org.eclipse.jetty.client.http;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
public class HttpClientConnectionFactory implements ClientConnectionFactory
{
@ -35,9 +31,7 @@ public class HttpClientConnectionFactory implements ClientConnectionFactory
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
return customize(new HttpConnectionOverHTTP(endPoint, destination, promise), context);
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, context);
return customize(connection, context);
}
}

View File

@ -28,11 +28,10 @@ import org.eclipse.jetty.client.DuplexHttpDestination;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -41,6 +40,7 @@ public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTran
{
public static final Origin.Protocol HTTP11 = new Origin.Protocol(List.of("http/1.1"), false);
private final ClientConnectionFactory factory = new HttpClientConnectionFactory();
private int headerCacheSize = 1024;
private boolean headerCacheCaseSensitive;
@ -76,18 +76,10 @@ public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTran
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
var connection = newHttpConnection(endPoint, destination, promise);
var connection = factory.newConnection(endPoint, context);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
return customize(connection, context);
}
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise);
return connection;
}
@ManagedAttribute("The maximum allowed size in bytes for an HTTP header field cache")

View File

@ -20,12 +20,14 @@ package org.eclipse.jetty.client.http;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
@ -54,10 +56,25 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
private final Promise<Connection> promise;
private final Delegate delegate;
private final HttpChannelOverHTTP channel;
private long idleTimeout;
private final LongAdder bytesIn = new LongAdder();
private final LongAdder bytesOut = new LongAdder();
private long idleTimeout;
public HttpConnectionOverHTTP(EndPoint endPoint, Map<String, Object> context)
{
this(endPoint, destinationFrom(context), promiseFrom(context));
}
private static HttpDestination destinationFrom(Map<String, Object> context)
{
return (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
}
@SuppressWarnings("unchecked")
private static Promise<Connection> promiseFrom(Map<String, Object> context)
{
return (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
}
public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{

View File

@ -110,7 +110,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), true);
boolean direct = client.isUseInputDirectByteBuffers();
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), direct);
}
private void releaseNetworkBuffer()
@ -129,7 +130,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
if (networkBuffer.hasRemaining())
{
ByteBuffer upgradeBuffer = BufferUtil.allocate(networkBuffer.remaining());
HttpClient client = getHttpDestination().getHttpClient();
ByteBuffer upgradeBuffer = BufferUtil.allocate(networkBuffer.remaining(), client.isUseInputDirectByteBuffers());
BufferUtil.clearToFill(upgradeBuffer);
BufferUtil.put(networkBuffer.getBuffer(), upgradeBuffer);
BufferUtil.flipToFlush(upgradeBuffer, 0);

View File

@ -39,13 +39,11 @@ import org.eclipse.jetty.util.IteratingCallback;
public class HttpSenderOverHTTP extends HttpSender
{
private final HttpGenerator generator = new HttpGenerator();
private final HttpClient httpClient;
private boolean shutdown;
public HttpSenderOverHTTP(HttpChannelOverHTTP channel)
{
super(channel);
httpClient = channel.getHttpDestination().getHttpClient();
}
@Override
@ -74,7 +72,9 @@ public class HttpSenderOverHTTP extends HttpSender
{
try
{
HttpClient httpClient = getHttpChannel().getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = httpClient.getByteBufferPool();
boolean useDirectByteBuffers = httpClient.isUseOutputDirectByteBuffers();
ByteBuffer chunk = null;
while (true)
{
@ -89,12 +89,12 @@ public class HttpSenderOverHTTP extends HttpSender
{
case NEED_CHUNK:
{
chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, useDirectByteBuffers);
break;
}
case NEED_CHUNK_TRAILER:
{
chunk = bufferPool.acquire(httpClient.getRequestBufferSize(), false);
chunk = bufferPool.acquire(httpClient.getRequestBufferSize(), useDirectByteBuffers);
break;
}
case FLUSH:
@ -218,21 +218,24 @@ public class HttpSenderOverHTTP extends HttpSender
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
HttpClient httpClient = getHttpChannel().getHttpDestination().getHttpClient();
ByteBufferPool byteBufferPool = httpClient.getByteBufferPool();
boolean useDirectByteBuffers = httpClient.isUseOutputDirectByteBuffers();
switch (result)
{
case NEED_HEADER:
{
headerBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
headerBuffer = byteBufferPool.acquire(httpClient.getRequestBufferSize(), useDirectByteBuffers);
break;
}
case NEED_CHUNK:
{
chunkBuffer = httpClient.getByteBufferPool().acquire(HttpGenerator.CHUNK_SIZE, false);
chunkBuffer = byteBufferPool.acquire(HttpGenerator.CHUNK_SIZE, useDirectByteBuffers);
break;
}
case NEED_CHUNK_TRAILER:
{
chunkBuffer = httpClient.getByteBufferPool().acquire(httpClient.getRequestBufferSize(), false);
chunkBuffer = byteBufferPool.acquire(httpClient.getRequestBufferSize(), useDirectByteBuffers);
break;
}
case FLUSH:
@ -307,6 +310,7 @@ public class HttpSenderOverHTTP extends HttpSender
private void release()
{
HttpClient httpClient = getHttpChannel().getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = httpClient.getByteBufferPool();
if (!BufferUtil.isTheEmptyBuffer(headerBuffer))
bufferPool.release(headerBuffer);

View File

@ -32,6 +32,7 @@ import java.util.NoSuchElementException;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -51,6 +52,7 @@ public class PathContentProvider extends AbstractTypedContentProvider
private final long fileSize;
private final int bufferSize;
private ByteBufferPool bufferPool;
private boolean useDirectByteBuffers = true;
public PathContentProvider(Path filePath) throws IOException
{
@ -101,6 +103,16 @@ public class PathContentProvider extends AbstractTypedContentProvider
this.bufferPool = byteBufferPool;
}
public boolean isUseDirectByteBuffers()
{
return useDirectByteBuffers;
}
public void setUseDirectByteBuffers(boolean useDirectByteBuffers)
{
this.useDirectByteBuffers = useDirectByteBuffers;
}
@Override
public Iterator<ByteBuffer> iterator()
{
@ -127,8 +139,8 @@ public class PathContentProvider extends AbstractTypedContentProvider
if (channel == null)
{
buffer = bufferPool == null
? ByteBuffer.allocateDirect(bufferSize)
: bufferPool.acquire(bufferSize, true);
? BufferUtil.allocate(bufferSize, isUseDirectByteBuffers())
: bufferPool.acquire(bufferSize, isUseDirectByteBuffers());
channel = Files.newByteChannel(filePath, StandardOpenOption.READ);
if (LOG.isDebugEnabled())
LOG.debug("Opened file {}", filePath);

View File

@ -18,13 +18,14 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.util.DeferredContentProvider;
@ -33,7 +34,6 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@ -77,9 +77,9 @@ public class HttpClientFailureTest
client = new HttpClient(new HttpClientTransportOverHTTP(1)
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpConnectionOverHTTP connection = super.newHttpConnection(endPoint, destination, promise);
HttpConnectionOverHTTP connection = (HttpConnectionOverHTTP)super.newConnection(endPoint, context);
connectionRef.set(connection);
return connection;
}
@ -107,9 +107,9 @@ public class HttpClientFailureTest
client = new HttpClient(new HttpClientTransportOverHTTP(1)
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpConnectionOverHTTP connection = super.newHttpConnection(endPoint, destination, promise);
HttpConnectionOverHTTP connection = (HttpConnectionOverHTTP)super.newConnection(endPoint, context);
connectionRef.set(connection);
return connection;
}

View File

@ -18,13 +18,13 @@
package org.eclipse.jetty.client;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
@ -34,7 +34,6 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@ -115,9 +114,9 @@ public class HttpClientIdleTimeoutTest
}
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise)
return new HttpConnectionOverHTTP(endPoint, context)
{
@Override
protected boolean onIdleTimeout(long idleTimeout)

View File

@ -40,6 +40,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@ -1529,9 +1530,9 @@ public class HttpClientTest extends AbstractHttpClientServerTest
client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector)
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
return new HttpConnectionOverHTTP(endPoint, destination, promise)
return new HttpConnectionOverHTTP(endPoint, context)
{
@Override
public void onOpen()

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -28,7 +29,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.http.HttpChannelOverHTTP;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
@ -38,7 +38,6 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.Test;
@ -158,9 +157,9 @@ public class HttpClientUploadDuringServerShutdownTest
HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1)
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise)
return new HttpConnectionOverHTTP(endPoint, context)
{
@Override
protected HttpChannelOverHTTP newHttpChannel()

View File

@ -292,6 +292,7 @@ public class MultiPartContentProviderTest extends AbstractHttpClientServerTest
MultiPartContentProvider multiPart = new MultiPartContentProvider();
PathContentProvider content = new PathContentProvider(contentType, tmpPath);
content.setByteBufferPool(client.getByteBufferPool());
content.setUseDirectByteBuffers(client.isUseOutputDirectByteBuffers());
multiPart.addFilePart(name, tmpPath.getFileName().toString(), content, null);
multiPart.close();
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())

View File

@ -135,8 +135,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
HttpClient client = destination.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
// TODO: configure directness.
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), true);
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
}
private void releaseNetworkBuffer()

View File

@ -22,6 +22,7 @@ import java.net.URI;
import java.util.Locale;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpContent;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpSender;
@ -44,7 +45,8 @@ public class HttpSenderOverFCGI extends HttpSender
public HttpSenderOverFCGI(HttpChannel channel)
{
super(channel);
this.generator = new ClientGenerator(channel.getHttpDestination().getHttpClient().getByteBufferPool());
HttpClient httpClient = channel.getHttpDestination().getHttpClient();
this.generator = new ClientGenerator(httpClient.getByteBufferPool(), httpClient.isUseOutputDirectByteBuffers());
}
@Override

View File

@ -40,7 +40,12 @@ public class ClientGenerator extends Generator
public ClientGenerator(ByteBufferPool byteBufferPool)
{
super(byteBufferPool);
this(byteBufferPool, true);
}
public ClientGenerator(ByteBufferPool byteBufferPool, boolean useDirectByteBuffers)
{
super(byteBufferPool, useDirectByteBuffers);
}
public Result generateRequestHeaders(int request, HttpFields fields, Callback callback)
@ -79,9 +84,9 @@ public class ClientGenerator extends Generator
// One FCGI_BEGIN_REQUEST + N FCGI_PARAMS + one last FCGI_PARAMS
ByteBuffer beginRequestBuffer = byteBufferPool.acquire(16, false);
ByteBuffer beginRequestBuffer = acquire(16);
BufferUtil.clearToFill(beginRequestBuffer);
Result result = new Result(byteBufferPool, callback);
Result result = new Result(getByteBufferPool(), callback);
result = result.append(beginRequestBuffer, true);
// Generate the FCGI_BEGIN_REQUEST frame
@ -95,7 +100,7 @@ public class ClientGenerator extends Generator
while (fieldsLength > 0)
{
int capacity = 8 + Math.min(maxCapacity, fieldsLength);
ByteBuffer buffer = byteBufferPool.acquire(capacity, true);
ByteBuffer buffer = acquire(capacity);
BufferUtil.clearToFill(buffer);
result = result.append(buffer, true);
@ -132,7 +137,7 @@ public class ClientGenerator extends Generator
BufferUtil.flipToFlush(buffer, 0);
}
ByteBuffer lastParamsBuffer = byteBufferPool.acquire(8, false);
ByteBuffer lastParamsBuffer = acquire(8);
BufferUtil.clearToFill(lastParamsBuffer);
result = result.append(lastParamsBuffer, true);

View File

@ -31,11 +31,23 @@ public class Generator
{
public static final int MAX_CONTENT_LENGTH = 0xFF_FF;
protected final ByteBufferPool byteBufferPool;
private final ByteBufferPool byteBufferPool;
private final boolean useDirectByteBuffers;
public Generator(ByteBufferPool byteBufferPool)
public Generator(ByteBufferPool byteBufferPool, boolean useDirectByteBuffers)
{
this.byteBufferPool = byteBufferPool;
this.useDirectByteBuffers = useDirectByteBuffers;
}
ByteBufferPool getByteBufferPool()
{
return byteBufferPool;
}
ByteBuffer acquire(int capacity)
{
return byteBufferPool.acquire(capacity, useDirectByteBuffers);
}
protected Result generateContent(int id, ByteBuffer content, boolean recycle, boolean lastContent, Callback callback, FCGI.FrameType frameType)
@ -47,7 +59,7 @@ public class Generator
while (contentLength > 0 || lastContent)
{
ByteBuffer buffer = byteBufferPool.acquire(8, false);
ByteBuffer buffer = acquire(8);
BufferUtil.clearToFill(buffer);
result = result.append(buffer, true);

View File

@ -42,12 +42,12 @@ public class ServerGenerator extends Generator
public ServerGenerator(ByteBufferPool byteBufferPool)
{
this(byteBufferPool, true);
this(byteBufferPool, true, true);
}
public ServerGenerator(ByteBufferPool byteBufferPool, boolean sendStatus200)
public ServerGenerator(ByteBufferPool byteBufferPool, boolean useDirectByteBuffers, boolean sendStatus200)
{
super(byteBufferPool);
super(byteBufferPool, useDirectByteBuffers);
this.sendStatus200 = sendStatus200;
}
@ -55,7 +55,7 @@ public class ServerGenerator extends Generator
{
request &= 0xFF_FF;
final Charset utf8 = StandardCharsets.UTF_8;
Charset utf8 = StandardCharsets.UTF_8;
List<byte[]> bytes = new ArrayList<>(fields.size() * 2);
int length = 0;
@ -88,7 +88,7 @@ public class ServerGenerator extends Generator
// End of headers
length += EOL.length;
final ByteBuffer buffer = byteBufferPool.acquire(length, true);
ByteBuffer buffer = acquire(length);
BufferUtil.clearToFill(buffer);
for (int i = 0; i < bytes.size(); i += 2)
@ -106,7 +106,7 @@ public class ServerGenerator extends Generator
{
if (aborted)
{
Result result = new Result(byteBufferPool, callback);
Result result = new Result(getByteBufferPool(), callback);
if (lastContent)
result.append(generateEndRequest(request, true), true);
else
@ -125,7 +125,7 @@ public class ServerGenerator extends Generator
private ByteBuffer generateEndRequest(int request, boolean aborted)
{
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false);
ByteBuffer endRequestBuffer = acquire(8);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);

View File

@ -43,9 +43,9 @@ public class HttpTransportOverFCGI implements HttpTransport
private volatile boolean shutdown;
private volatile boolean aborted;
public HttpTransportOverFCGI(ByteBufferPool byteBufferPool, Flusher flusher, int request, boolean sendStatus200)
public HttpTransportOverFCGI(ByteBufferPool byteBufferPool, boolean useDirectByteBuffers, boolean sendStatus200, Flusher flusher, int request)
{
this.generator = new ServerGenerator(byteBufferPool, sendStatus200);
this.generator = new ServerGenerator(byteBufferPool, useDirectByteBuffers, sendStatus200);
this.flusher = flusher;
this.request = request;
}

View File

@ -47,6 +47,8 @@ public class ServerFCGIConnection extends AbstractConnection
private final Flusher flusher;
private final HttpConfiguration configuration;
private final ServerParser parser;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;
public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200)
{
@ -58,6 +60,26 @@ public class ServerFCGIConnection extends AbstractConnection
this.parser = new ServerParser(new ServerListener());
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
@Override
public void onOpen()
{
@ -70,7 +92,7 @@ public class ServerFCGIConnection extends AbstractConnection
{
EndPoint endPoint = getEndPoint();
ByteBufferPool bufferPool = connector.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(configuration.getResponseHeaderSize(), true);
ByteBuffer buffer = bufferPool.acquire(configuration.getResponseHeaderSize(), isUseInputDirectByteBuffers());
try
{
while (true)
@ -133,7 +155,7 @@ public class ServerFCGIConnection extends AbstractConnection
{
// TODO: handle flags
HttpChannelOverFCGI channel = new HttpChannelOverFCGI(connector, configuration, getEndPoint(),
new HttpTransportOverFCGI(connector.getByteBufferPool(), flusher, request, sendStatus200));
new HttpTransportOverFCGI(connector.getByteBufferPool(), isUseOutputDirectByteBuffers(), sendStatus200, flusher, request));
HttpChannelOverFCGI existing = channels.putIfAbsent(request, channel);
if (existing != null)
throw new IllegalStateException();

View File

@ -23,11 +23,16 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject
public class ServerFCGIConnectionFactory extends AbstractConnectionFactory
{
private final HttpConfiguration configuration;
private final boolean sendStatus200;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;
public ServerFCGIConnectionFactory(HttpConfiguration configuration)
{
@ -39,11 +44,38 @@ public class ServerFCGIConnectionFactory extends AbstractConnectionFactory
super("fcgi/1.0");
this.configuration = configuration;
this.sendStatus200 = sendStatus200;
setUseInputDirectByteBuffers(configuration.isUseInputDirectByteBuffers());
setUseOutputDirectByteBuffers(configuration.isUseOutputDirectByteBuffers());
}
@ManagedAttribute("Whether to use direct ByteBuffers for reading")
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct ByteBuffers for writing")
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
return new ServerFCGIConnection(connector, endPoint, configuration, sendStatus200);
ServerFCGIConnection connection = new ServerFCGIConnection(connector, endPoint, configuration, sendStatus200);
connection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
return configure(connection, connector, endPoint);
}
}

View File

@ -116,6 +116,8 @@ public class HTTP2Client extends ContainerLifeCycle
private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
public HTTP2Client()
{
@ -315,6 +317,28 @@ public class HTTP2Client extends ContainerLifeCycle
this.maxSettingsKeys = maxSettingsKeys;
}
@ManagedAttribute("Whether to use direct ByteBuffers for reading")
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct ByteBuffers for writing")
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
public void connect(InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
{
// Prior-knowledge clear-text HTTP/2 (h2c).

View File

@ -51,33 +51,35 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
final HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY);
final ByteBufferPool byteBufferPool = client.getByteBufferPool();
final Executor executor = client.getExecutor();
final Scheduler scheduler = client.getScheduler();
final Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY);
HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY);
ByteBufferPool byteBufferPool = client.getByteBufferPool();
Executor executor = client.getExecutor();
Scheduler scheduler = client.getScheduler();
Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked")
final Promise<Session> promise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY);
Promise<Session> promise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY);
final Generator generator = new Generator(byteBufferPool);
final FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
final HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Generator generator = new Generator(byteBufferPool);
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
session.setMaxRemoteStreams(client.getMaxConcurrentPushedStreams());
long streamIdleTimeout = client.getStreamIdleTimeout();
if (streamIdleTimeout > 0)
session.setStreamIdleTimeout(streamIdleTimeout);
final Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());
final HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
connection.setUseInputDirectByteBuffers(client.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(client.isUseOutputDirectByteBuffers());
connection.addEventListener(connectionListener);
return customize(connection, context);
}
private class HTTP2ClientConnection extends HTTP2Connection implements Callback
private static class HTTP2ClientConnection extends HTTP2Connection implements Callback
{
private final HTTP2Client client;
private final Promise<Session> promise;
@ -154,7 +156,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
}
}
private class ConnectionListener implements Connection.Listener
private static class ConnectionListener implements Connection.Listener
{
@Override
public void onOpened(Connection connection)

View File

@ -450,7 +450,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
{
long total = 0;
for (ByteBuffer buffer : buffers)
{
total += buffer.remaining();
}
return total;
}
@ -463,7 +465,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
throw new BufferOverflowException();
ByteBuffer result = BufferUtil.allocateDirect((int)capacity);
for (ByteBuffer buffer : buffers)
{
BufferUtil.append(result, buffer);
}
return result;
}
@ -523,7 +527,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
if (frame.isEndStream())
{
if (buffer.hasRemaining())
offer(buffer, Callback.from(() -> {}, callback::failed), null);
offer(buffer, Callback.from(Callback.NOOP::succeeded, callback::failed), null);
offer(BufferUtil.EMPTY_BUFFER, callback, Entry.EOF);
}
else

View File

@ -107,6 +107,7 @@ public class ContinuationBodyParser extends BodyParser
{
ByteBuffer headerBlock = headerBlockFragments.complete();
MetaData metaData = headerBlockParser.parse(headerBlock, headerBlock.remaining());
headerBlockFragments.getByteBufferPool().release(headerBlock);
if (metaData == null)
return true;
if (metaData == HeaderBlockParser.SESSION_FAILURE)

View File

@ -21,20 +21,33 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.io.ByteBufferPool;
public class HeaderBlockFragments
{
private final ByteBufferPool byteBufferPool;
private PriorityFrame priorityFrame;
private boolean endStream;
private int streamId;
private ByteBuffer storage;
public HeaderBlockFragments(ByteBufferPool byteBufferPool)
{
this.byteBufferPool = byteBufferPool;
}
public ByteBufferPool getByteBufferPool()
{
return byteBufferPool;
}
public void storeFragment(ByteBuffer fragment, int length, boolean last)
{
if (storage == null)
{
int space = last ? length : length * 2;
storage = ByteBuffer.allocate(space);
storage = byteBufferPool.acquire(space, fragment.isDirect());
storage.clear();
}
// Grow the storage if necessary.
@ -42,9 +55,11 @@ public class HeaderBlockFragments
{
int space = last ? length : length * 2;
int capacity = storage.position() + space;
ByteBuffer newStorage = ByteBuffer.allocate(capacity);
ByteBuffer newStorage = byteBufferPool.acquire(capacity, storage.isDirect());
newStorage.clear();
storage.flip();
newStorage.put(storage);
byteBufferPool.release(storage);
storage = newStorage;
}

View File

@ -73,7 +73,7 @@ public class HeaderBlockParser
{
if (blockBuffer == null)
{
blockBuffer = byteBufferPool.acquire(blockLength, false);
blockBuffer = byteBufferPool.acquire(blockLength, buffer.isDirect());
BufferUtil.clearToFill(blockBuffer);
}
blockBuffer.put(buffer);

View File

@ -78,7 +78,7 @@ public class Parser
Listener listener = wrapper.apply(this.listener);
unknownBodyParser = new UnknownBodyParser(headerParser, listener);
HeaderBlockParser headerBlockParser = new HeaderBlockParser(headerParser, byteBufferPool, hpackDecoder, unknownBodyParser);
HeaderBlockFragments headerBlockFragments = new HeaderBlockFragments();
HeaderBlockFragments headerBlockFragments = new HeaderBlockFragments(byteBufferPool);
bodyParsers[FrameType.DATA.getType()] = new DataBodyParser(headerParser, listener);
bodyParsers[FrameType.HEADERS.getType()] = new HeadersBodyParser(headerParser, listener, headerBlockParser, headerBlockFragments);
bodyParsers[FrameType.PRIORITY.getType()] = new PriorityBodyParser(headerParser, listener);

View File

@ -95,6 +95,8 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
client.setConnectTimeout(httpClient.getConnectTimeout());
client.setIdleTimeout(httpClient.getIdleTimeout());
client.setInputBufferSize(httpClient.getResponseBufferSize());
client.setUseInputDirectByteBuffers(httpClient.isUseInputDirectByteBuffers());
client.setUseOutputDirectByteBuffers(httpClient.isUseOutputDirectByteBuffers());
}
addBean(client);
super.doStart();

View File

@ -100,6 +100,8 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
httpClient.setExecutor(executor);
httpClient.setConnectTimeout(13);
httpClient.setIdleTimeout(17);
httpClient.setUseInputDirectByteBuffers(false);
httpClient.setUseOutputDirectByteBuffers(false);
httpClient.start();
@ -109,6 +111,8 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
assertSame(httpClient.getByteBufferPool(), http2Client.getByteBufferPool());
assertEquals(httpClient.getConnectTimeout(), http2Client.getConnectTimeout());
assertEquals(httpClient.getIdleTimeout(), http2Client.getIdleTimeout());
assertEquals(httpClient.isUseInputDirectByteBuffers(), http2Client.isUseInputDirectByteBuffers());
assertEquals(httpClient.isUseOutputDirectByteBuffers(), http2Client.isUseOutputDirectByteBuffers());
httpClient.stop();

View File

@ -64,8 +64,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private RateControl.Factory rateControlFactory = new WindowRateControl.Factory(20);
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
private boolean _useInputDirectByteBuffers;
private boolean _useOutputDirectByteBuffers;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
@ -215,24 +215,26 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.rateControlFactory = Objects.requireNonNull(rateControlFactory);
}
@ManagedAttribute("Whether to use direct ByteBuffers for reading")
public boolean isUseInputDirectByteBuffers()
{
return _useInputDirectByteBuffers;
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
_useInputDirectByteBuffers = useInputDirectByteBuffers;
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct ByteBuffers for writing")
public boolean isUseOutputDirectByteBuffers()
{
return _useOutputDirectByteBuffers;
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
_useOutputDirectByteBuffers = useOutputDirectByteBuffers;
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
public HttpConfiguration getHttpConfiguration()

View File

@ -40,14 +40,13 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.unixsocket.common.UnixSocketEndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
@ -57,6 +56,8 @@ public class HttpClientTransportOverUnixSockets extends AbstractConnectorHttpCli
{
private static final Logger LOG = Log.getLogger(HttpClientTransportOverUnixSockets.class);
private final ClientConnectionFactory factory = new HttpClientConnectionFactory();
public HttpClientTransportOverUnixSockets(String unixSocket)
{
this(new UnixSocketClientConnector(unixSocket));
@ -86,20 +87,12 @@ public class HttpClientTransportOverUnixSockets extends AbstractConnectorHttpCli
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
org.eclipse.jetty.io.Connection connection = newHttpConnection(endPoint, destination, promise);
var connection = factory.newConnection(endPoint, context);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", connection);
return customize(connection, context);
}
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<org.eclipse.jetty.client.api.Connection> promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise);
return connection;
}
private static class UnixSocketClientConnector extends ClientConnector

View File

@ -135,6 +135,20 @@ public class BufferUtil
return buf;
}
/**
* Allocates a ByteBuffer in flush mode.
* The position and limit will both be zero, indicating that the buffer is
* empty and must be flipped before any data is put to it.
*
* @param capacity capacity of the allocated ByteBuffer
* @param direct whether the ByteBuffer is direct
* @return the newly allocated ByteBuffer
*/
public static ByteBuffer allocate(int capacity, boolean direct)
{
return direct ? allocateDirect(capacity) : allocate(capacity);
}
/**
* Deep copy of a buffer
*

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@ -127,9 +128,9 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
return new HttpClientTransportOverHTTP(clientConnector)
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
return new HttpConnectionOverHTTP(endPoint, destination, promise)
return new HttpConnectionOverHTTP(endPoint, context)
{
@Override
protected HttpChannelOverHTTP newHttpChannel()
@ -210,9 +211,9 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
return new HttpClientTransportOverUnixSockets(scenario.sockFile.toString())
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise)
return new HttpConnectionOverHTTP(endPoint, context)
{
@Override
protected HttpChannelOverHTTP newHttpChannel()