use RetainableByteBuffer for getBuffer in HttpContent

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2022-10-20 21:29:55 +11:00
parent 8768725de9
commit 0e1bae4059
17 changed files with 133 additions and 95 deletions

View File

@ -14,7 +14,6 @@
package org.eclipse.jetty.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Set;
import java.util.SortedSet;
@ -24,6 +23,10 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.NoopByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.resource.Resource;
@ -42,13 +45,20 @@ public class CachingHttpContentFactory implements HttpContent.Factory
private final HttpContent.Factory _authority;
private final ConcurrentMap<String, CachingHttpContent> _cache = new ConcurrentHashMap<>();
private final AtomicLong _cachedSize = new AtomicLong();
private final RetainableByteBufferPool _byteBufferPool;
private int _maxCachedFileSize = 128 * 1024 * 1024;
private int _maxCachedFiles = 2048;
private int _maxCacheSize = 256 * 1024 * 1024;
public CachingHttpContentFactory(HttpContent.Factory authority)
{
this(authority, null);
}
public CachingHttpContentFactory(HttpContent.Factory authority, RetainableByteBufferPool byteBufferPool)
{
_authority = authority;
_byteBufferPool = (byteBufferPool == null) ? new NoopByteBufferPool().asRetainableByteBufferPool() : byteBufferPool;
}
protected ConcurrentMap<String, CachingHttpContent> getCache()
@ -125,6 +135,7 @@ public class CachingHttpContentFactory implements HttpContent.Factory
});
sorted.addAll(_cache.values());
// TODO: Can we remove the buffers from the content before evicting.
// Invalidate least recently used first
for (CachingHttpContent content : sorted)
{
@ -232,9 +243,9 @@ public class CachingHttpContentFactory implements HttpContent.Factory
boolean isValid();
}
protected static class CachedHttpContent extends HttpContentWrapper implements CachingHttpContent
protected class CachedHttpContent extends HttpContentWrapper implements CachingHttpContent
{
private final ByteBuffer _buffer;
private final RetainableByteBuffer _buffer;
private final String _cacheKey;
private final HttpField _etagField;
private final long _contentLengthValue;
@ -262,7 +273,23 @@ public class CachingHttpContentFactory implements HttpContent.Factory
_etagField = etagField;
// Map the content into memory if possible.
_buffer = httpContent.getBuffer();
RetainableByteBuffer buffer = null;
try
{
long contentLengthValue = getContentLengthValue();
if (contentLengthValue > _maxCachedFileSize)
{
buffer = _byteBufferPool.acquire((int)contentLengthValue, false);
BufferUtil.readFrom(httpContent.getResource().newReadableByteChannel(), contentLengthValue, buffer.getBuffer());
}
}
catch (Throwable t)
{
buffer = null;
LOG.warn("Failed to read Resource", t);
}
_buffer = buffer;
_contentLengthValue = httpContent.getContentLengthValue();
_lastModifiedValue = httpContent.getLastModifiedValue();
_characterEncoding = httpContent.getCharacterEncoding();
@ -282,13 +309,9 @@ public class CachingHttpContentFactory implements HttpContent.Factory
}
@Override
public ByteBuffer getBuffer()
public RetainableByteBuffer getBuffer()
{
// TODO this should return a RetainableByteBuffer otherwise there is a race between
// threads serving the buffer while another thread invalidates it. That's going to
// be a lot of fun since RetainableByteBuffer is only meant to be acquired from a pool
// but the byte buffer here could be coming from a mmap'ed file.
return _buffer.slice();
return _buffer;
}
@Override
@ -312,7 +335,8 @@ public class CachingHttpContentFactory implements HttpContent.Factory
@Override
public void release()
{
// TODO re-pool buffer and release precompressed contents
if (_buffer != null)
_buffer.release();
}
@Override
@ -491,7 +515,7 @@ public class CachingHttpContentFactory implements HttpContent.Factory
}
@Override
public ByteBuffer getBuffer()
public RetainableByteBuffer getBuffer()
{
return null;
}

View File

@ -43,20 +43,23 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class EvictingCachingContentFactory extends CachingHttpContentFactory implements Runnable
{
private final Scheduler _scheduler;
private final long _delay;
private final long _sweepDelay;
private final long _validationTime;
private final long _maxCacheIdleTime;
public EvictingCachingContentFactory(HttpContent.Factory authority, long validationTime)
{
this(authority, validationTime, null, -1);
this(authority, validationTime, null, -1, -1);
}
public EvictingCachingContentFactory(HttpContent.Factory authority, long validationTime, Scheduler scheduler, long sweepDelay)
// TODO: ANNOTATION @Name
public EvictingCachingContentFactory(HttpContent.Factory authority, long validationTime, Scheduler scheduler, long sweepDelay, long maxCacheIdleTime)
{
super(authority);
_validationTime = validationTime;
_scheduler = scheduler;
_delay = sweepDelay;
_sweepDelay = sweepDelay;
_maxCacheIdleTime = maxCacheIdleTime;
if (scheduler != null && sweepDelay > 0)
schedule();
}
@ -71,7 +74,7 @@ public class EvictingCachingContentFactory extends CachingHttpContentFactory imp
private void schedule()
{
_scheduler.schedule(this, _delay, TimeUnit.MILLISECONDS);
_scheduler.schedule(this, _sweepDelay, TimeUnit.MILLISECONDS);
}
@Override
@ -81,7 +84,9 @@ public class EvictingCachingContentFactory extends CachingHttpContentFactory imp
for (Map.Entry<String, CachingHttpContent> entry : cache.entrySet())
{
CachingHttpContent value = entry.getValue();
if (!value.isValid())
if (_maxCacheIdleTime > 0 && NanoTime.since(value.getLastAccessedNanos()) > TimeUnit.MILLISECONDS.toNanos(_maxCacheIdleTime))
removeFromCache(value);
else if (!value.isValid())
removeFromCache(value);
}
schedule();
@ -99,7 +104,7 @@ public class EvictingCachingContentFactory extends CachingHttpContentFactory imp
return new EvictingNotFoundContent(p, _validationTime);
}
protected static class EvictingCachedContent extends CachedHttpContent
protected class EvictingCachedContent extends CachedHttpContent
{
private final long _validationTime;
private final AtomicLong _lastValidated = new AtomicLong();

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,11 +53,12 @@ public class FileMappedHttpContentFactory implements HttpContent.Factory
}
@Override
public ByteBuffer getBuffer()
public RetainableByteBuffer getBuffer()
{
try
{
return BufferUtil.toMappedBuffer(content.getResource().getPath());
ByteBuffer buffer = BufferUtil.toMappedBuffer(content.getResource().getPath());
return new RetainableByteBuffer(buffer, b -> {});
}
catch (Throwable t)
{

View File

@ -14,11 +14,11 @@
package org.eclipse.jetty.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Set;
import org.eclipse.jetty.http.MimeTypes.Type;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.resource.Resource;
/**
@ -61,7 +61,7 @@ public interface HttpContent
Resource getResource();
ByteBuffer getBuffer();
RetainableByteBuffer getBuffer();
/**
* @return Set of available pre-compressed formats for this content, or null if this has not been checked.

View File

@ -13,11 +13,11 @@
package org.eclipse.jetty.http;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Set;
import org.eclipse.jetty.http.MimeTypes.Type;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.resource.Resource;
/**
@ -122,7 +122,7 @@ public class HttpContentWrapper implements HttpContent
}
@Override
public ByteBuffer getBuffer()
public RetainableByteBuffer getBuffer()
{
return _delegate.getBuffer();
}

View File

@ -13,11 +13,11 @@
package org.eclipse.jetty.http;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Set;
import org.eclipse.jetty.http.MimeTypes.Type;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.resource.Resource;
public class PreCompressedHttpContent implements HttpContent
@ -138,7 +138,7 @@ public class PreCompressedHttpContent implements HttpContent
}
@Override
public ByteBuffer getBuffer()
public RetainableByteBuffer getBuffer()
{
return _precompressedContent.getBuffer();
}

View File

@ -13,14 +13,12 @@
package org.eclipse.jetty.http;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Set;
import org.eclipse.jetty.http.MimeTypes.Type;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.resource.Resource;
/**
@ -140,26 +138,9 @@ public class ResourceHttpContent implements HttpContent
}
@Override
public ByteBuffer getBuffer()
public RetainableByteBuffer getBuffer()
{
try
{
long contentLengthValue = getContentLengthValue();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect((int)contentLengthValue);
try (SeekableByteChannel channel = Files.newByteChannel(getResource().getPath()))
{
// fill buffer
int read = 0;
while (read != contentLengthValue)
read += channel.read(byteBuffer);
}
byteBuffer.flip();
return byteBuffer;
}
catch (Throwable t)
{
return null;
}
return null;
}
@Override

View File

@ -38,7 +38,7 @@ public class RetainableByteBuffer extends Retainable.ReferenceCounter
private final Consumer<RetainableByteBuffer> releaser;
private final AtomicLong lastUpdate = new AtomicLong(NanoTime.now());
RetainableByteBuffer(ByteBuffer buffer, Consumer<RetainableByteBuffer> releaser)
public RetainableByteBuffer(ByteBuffer buffer, Consumer<RetainableByteBuffer> releaser)
{
super(0);
this.releaser = releaser;

View File

@ -44,6 +44,8 @@ import org.eclipse.jetty.http.QuotedCSV;
import org.eclipse.jetty.http.QuotedQualityCSV;
import org.eclipse.jetty.http.ResourceHttpContent;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -633,11 +635,21 @@ public class ResourceService
{
try
{
ByteBuffer buffer = content.getBuffer();
RetainableByteBuffer buffer = content.getBuffer();
if (buffer != null)
response.write(true, buffer, callback);
{
buffer.retain();
response.write(true, buffer.getBuffer(), Callback.from(callback, buffer::release));
}
else
{
// TODO: is it possible to do zero-copy transfer?
// WritableByteChannel c = Response.asWritableByteChannel(target);
// FileChannel fileChannel = (FileChannel) source;
// fileChannel.transferTo(0, contentLength, c);
new ContentWriterIteratingCallback(content, response, callback).iterate();
}
}
catch (Throwable x)
{
@ -840,50 +852,49 @@ public class ResourceService
private final ReadableByteChannel source;
private final Content.Sink sink;
private final Callback callback;
private final ByteBuffer byteBuffer;
private final RetainableByteBuffer byteBuffer;
public ContentWriterIteratingCallback(HttpContent content, Response target, Callback callback) throws IOException
{
// TODO: is it possible to do zero-copy transfer?
// WritableByteChannel c = Response.asWritableByteChannel(target);
// FileChannel fileChannel = (FileChannel) source;
// fileChannel.transferTo(0, contentLength, c);
RetainableByteBufferPool byteBufferPool = target.getRequest().getComponents().getByteBufferPool().asRetainableByteBufferPool();
this.source = Files.newByteChannel(content.getResource().getPath());
this.sink = target;
this.callback = callback;
int outputBufferSize = target.getRequest().getConnectionMetaData().getHttpConfiguration().getOutputBufferSize();
boolean useOutputDirectByteBuffers = target.getRequest().getConnectionMetaData().getHttpConfiguration().isUseOutputDirectByteBuffers();
this.byteBuffer = useOutputDirectByteBuffers ? ByteBuffer.allocateDirect(outputBufferSize) : ByteBuffer.allocate(outputBufferSize); // TODO use pool
this.byteBuffer = byteBufferPool.acquire(outputBufferSize, useOutputDirectByteBuffers);
}
@Override
protected Action process() throws Throwable
{
// TODO: use BufferUtil
if (!source.isOpen())
return Action.SUCCEEDED;
byteBuffer.clear();
int read = source.read(byteBuffer);
int read = source.read(byteBuffer.getBuffer());
if (read == -1)
{
IO.close(source);
sink.write(true, BufferUtil.EMPTY_BUFFER, this);
return Action.SCHEDULED;
}
byteBuffer.flip();
sink.write(false, byteBuffer, this);
byteBuffer.getBuffer().flip();
sink.write(false, byteBuffer.getBuffer(), this);
return Action.SCHEDULED;
}
@Override
protected void onCompleteSuccess()
{
byteBuffer.release();
callback.succeeded();
}
@Override
protected void onCompleteFailure(Throwable x)
{
byteBuffer.release();
callback.failed(x);
}
}

View File

@ -595,6 +595,21 @@ public class BufferUtil
}
}
public static void readFrom(ReadableByteChannel newReadableByteChannel, long contentLengthValue, ByteBuffer byteBuffer) throws IOException
{
int pos = BufferUtil.flipToFill(byteBuffer);
try (ReadableByteChannel channel = newReadableByteChannel)
{
int read = 0;
while (read != contentLengthValue)
read += channel.read(byteBuffer);
}
finally
{
BufferUtil.flipToFlush(byteBuffer, pos);
}
}
public static void readFrom(InputStream is, int needed, ByteBuffer buffer) throws IOException
{
ByteBuffer tmp = allocate(8192);

View File

@ -138,12 +138,13 @@ public class DefaultServlet extends HttpServlet
if (getInitBoolean("useFileMappedBuffer", false))
contentFactory = new FileMappedHttpContentFactory(contentFactory);
CachingHttpContentFactory cached = new EvictingCachingContentFactory(contentFactory, Duration.ofSeconds(1).toMillis());
int maxCacheSize = getInitInt("maxCacheSize", -2);
int maxCachedFileSize = getInitInt("maxCachedFileSize", -2);
int maxCachedFiles = getInitInt("maxCachedFiles", -2);
if (maxCachedFiles != -2 || maxCacheSize != -2 || maxCachedFileSize != -2)
{
CachingHttpContentFactory cached = new EvictingCachingContentFactory(contentFactory, Duration.ofSeconds(1).toMillis());
contentFactory = cached;
if (maxCacheSize >= 0)
cached.setMaxCacheSize(maxCacheSize);
if (maxCachedFileSize >= -1)
@ -151,10 +152,7 @@ public class DefaultServlet extends HttpServlet
if (maxCachedFiles >= -1)
cached.setMaxCachedFiles(maxCachedFiles);
}
contentFactory = cached;
String resourceCache = getInitParameter("resourceCache");
getServletContext().setAttribute(resourceCache == null ? "resourceCache" : resourceCache, cached);
getServletContext().setAttribute(HttpContent.Factory.class.getName(), contentFactory);
}
_resourceService.setContentFactory(contentFactory);

View File

@ -35,6 +35,7 @@ import jakarta.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -1305,11 +1306,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (LOG.isDebugEnabled())
LOG.debug("sendContent(http={},{})", httpContent, callback);
ByteBuffer buffer = httpContent.getBuffer();
RetainableByteBuffer buffer = httpContent.getBuffer();
if (buffer != null)
{
sendContent(buffer, callback);
buffer.retain();
sendContent(buffer.getBuffer(), Callback.from(callback, buffer::release));
return;
}

View File

@ -17,7 +17,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
@ -49,6 +48,7 @@ import org.eclipse.jetty.http.PreCompressedHttpContent;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http.QuotedCSV;
import org.eclipse.jetty.http.QuotedQualityCSV;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.WriterOutputStream;
import org.eclipse.jetty.server.ResourceListing;
import org.eclipse.jetty.util.BufferUtil;
@ -854,10 +854,18 @@ public class ResourceService
if (start == 0 && content.getResource().length() == contentLength)
{
// attempt efficient ByteBuffer based write for whole content
ByteBuffer buffer = content.getBuffer();
RetainableByteBuffer buffer = content.getBuffer();
if (buffer != null)
{
BufferUtil.writeTo(buffer, out);
try
{
buffer.retain();
BufferUtil.writeTo(buffer.getBuffer(), out);
}
finally
{
buffer.release();
}
return;
}

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
/**
@ -24,18 +25,17 @@ import org.eclipse.jetty.util.BufferUtil;
*/
public class ByteBufferRangeWriter implements RangeWriter
{
private final ByteBuffer buffer;
private boolean closed = false;
private final RetainableByteBuffer buffer;
public ByteBufferRangeWriter(ByteBuffer buffer)
public ByteBufferRangeWriter(RetainableByteBuffer buffer)
{
this.buffer = buffer.asReadOnlyBuffer();
this.buffer = buffer;
}
@Override
public void close() throws IOException
{
closed = true;
buffer.release();
}
@Override
@ -51,7 +51,7 @@ public class ByteBufferRangeWriter implements RangeWriter
throw new IllegalArgumentException("Unsupported length " + skipTo + " > " + Integer.MAX_VALUE);
}
ByteBuffer src = buffer.slice();
ByteBuffer src = buffer.getBuffer().slice();
src.position((int)skipTo);
src.limit(Math.addExact((int)skipTo, (int)length));
BufferUtil.writeTo(src, outputStream);

View File

@ -14,11 +14,11 @@
package org.eclipse.jetty.ee9.nested.resource;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.Objects;
import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,7 +40,7 @@ public class HttpContentRangeWriter
Objects.requireNonNull(content, "HttpContent");
// Try direct buffer
ByteBuffer buffer = content.getBuffer();
RetainableByteBuffer buffer = content.getBuffer();
if (buffer != null)
return new ByteBufferRangeWriter(buffer);

View File

@ -250,12 +250,13 @@ public class DefaultServlet extends HttpServlet implements ResourceFactory, Welc
if (_useFileMappedBuffer)
contentFactory = new FileMappedHttpContentFactory(contentFactory);
_cachingContentFactory = new EvictingCachingContentFactory(contentFactory, Duration.ofSeconds(1).toMillis());
int maxCacheSize = getInitInt("maxCacheSize", -2);
int maxCachedFileSize = getInitInt("maxCachedFileSize", -2);
int maxCachedFiles = getInitInt("maxCachedFiles", -2);
if (maxCachedFiles != -2 || maxCacheSize != -2 || maxCachedFileSize != -2)
{
_cachingContentFactory = new EvictingCachingContentFactory(contentFactory, Duration.ofSeconds(1).toMillis());
contentFactory = _cachingContentFactory;
if (maxCacheSize >= 0)
_cachingContentFactory.setMaxCacheSize(maxCacheSize);
if (maxCachedFileSize >= -1)
@ -263,10 +264,7 @@ public class DefaultServlet extends HttpServlet implements ResourceFactory, Welc
if (maxCachedFiles >= -1)
_cachingContentFactory.setMaxCachedFiles(maxCachedFiles);
}
contentFactory = _cachingContentFactory;
String resourceCache = getInitParameter("resourceCache");
getServletContext().setAttribute(resourceCache == null ? "resourceCache" : resourceCache, _cachingContentFactory);
getServletContext().setAttribute(HttpContent.Factory.class.getName(), contentFactory);
}
_resourceService.setContentFactory(contentFactory);
_resourceService.setWelcomeFactory(this);

View File

@ -18,7 +18,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
@ -47,7 +46,7 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.http.ResourceHttpContentFactory;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.AllowedResourceAliasChecker;
import org.eclipse.jetty.server.HttpConfiguration;
@ -1214,7 +1213,6 @@ public class DefaultServletTest
}
@Test
@Disabled // TODO
public void testDirectFromResourceHttpContent() throws Exception
{
FS.ensureDirExists(docRoot);
@ -1237,14 +1235,11 @@ public class DefaultServletTest
assertThat(response.toString(), response.getStatus(), is(HttpStatus.OK_200));
assertThat(response.getContent(), containsString("<h1>Hello World</h1>"));
ResourceHttpContentFactory factory = (ResourceHttpContentFactory)context.getServletContext().getAttribute("resourceCache");
HttpContent.Factory factory = (HttpContent.Factory)context.getServletContext().getAttribute(HttpContent.Factory.class.getName());
HttpContent content = factory.getContent("/index.html");
ByteBuffer buffer = content.getBuffer();
assertThat("Buffer is direct", buffer.isDirect(), is(true));
content = factory.getContent("/index.html");
buffer = content.getBuffer();
assertThat("Direct buffer", buffer, is(nullValue()));
RetainableByteBuffer buffer = content.getBuffer();
assertThat("Buffer is null", buffer, is(nullValue()));
}
@SuppressWarnings("Duplicates")