Jetty 9.4.x 3117 retainable byte buffer (#3118)

* Move RetainableByteBuffer to jetty-io=
use RetainableByteBuffer
use RetainableByteBuffer - changes from review.
Reviewed and applied small changes.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2018-11-22 11:31:05 +01:00 committed by GitHub
parent 910665a55a
commit 89e1dd033a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 182 additions and 104 deletions

View File

@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http2.frames.DataFrame;
@ -31,10 +30,10 @@ import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Retainable;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -217,8 +216,8 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private void setInputBuffer(ByteBuffer byteBuffer)
{
if (networkBuffer == null)
networkBuffer = acquireNetworkBuffer();
acquireNetworkBuffer();
// TODO handle buffer overflow?
networkBuffer.put(byteBuffer);
}
@ -234,93 +233,104 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
if (isFillInterested() || shutdown || failed)
return null;
if (networkBuffer == null)
networkBuffer = acquireNetworkBuffer();
boolean parse = networkBuffer.hasRemaining();
while (true)
boolean interested = false;
acquireNetworkBuffer();
try
{
if (parse)
boolean parse = networkBuffer.hasRemaining();
while (true)
{
boolean released;
networkBuffer.retain();
try
if (parse)
{
while (networkBuffer.hasRemaining())
{
parser.parse(networkBuffer.buffer);
parser.parse(networkBuffer.getBuffer());
if (failed)
return null;
}
}
finally
{
released = networkBuffer.release();
if (failed && released)
releaseNetworkBuffer();
task = pollTask();
if (LOG.isDebugEnabled())
LOG.debug("Dequeued new task {}", task);
if (task != null)
return task;
// If more references than 1 (ie not just us), don't refill into buffer and risk compaction.
if (networkBuffer.getReferences() > 1)
reacquireNetworkBuffer();
}
task = pollTask();
// Here we know that this.networkBuffer is not retained by
// application code: either it has been released, or it's a new one.
int filled = fill(getEndPoint(), networkBuffer.getBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Dequeued new task {}", task);
if (task != null)
LOG.debug("Filled {} bytes in {}", filled, networkBuffer);
if (filled > 0)
{
if (released)
releaseNetworkBuffer();
else
networkBuffer = null;
return task;
bytesIn.addAndGet(filled);
parse = true;
}
else if (filled == 0)
{
interested = true;
return null;
}
else
{
if (!released)
networkBuffer = acquireNetworkBuffer();
shutdown = true;
session.onShutdown();
return null;
}
}
// Here we know that this.buffer is not retained:
// either it has been released, or it's a new one.
int filled = fill(getEndPoint(), networkBuffer.buffer);
if (LOG.isDebugEnabled())
LOG.debug("Filled {} bytes in {}", filled, networkBuffer);
if (filled > 0)
{
bytesIn.addAndGet(filled);
parse = true;
}
else if (filled == 0)
{
releaseNetworkBuffer();
}
finally
{
releaseNetworkBuffer();
if (interested)
getEndPoint().fillInterested(fillableCallback);
return null;
}
else
{
releaseNetworkBuffer();
shutdown = true;
session.onShutdown();
return null;
}
}
}
private NetworkBuffer acquireNetworkBuffer()
private void acquireNetworkBuffer()
{
NetworkBuffer networkBuffer = new NetworkBuffer();
if (networkBuffer == null)
{
networkBuffer = new NetworkBuffer();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", networkBuffer);
}
}
private void reacquireNetworkBuffer()
{
NetworkBuffer currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();
if (currentBuffer.getBuffer().hasRemaining())
throw new IllegalStateException();
currentBuffer.release();
networkBuffer = new NetworkBuffer();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", networkBuffer);
return networkBuffer;
LOG.debug("Reacquired {}<-{}", currentBuffer, networkBuffer);
}
private void releaseNetworkBuffer()
{
if (LOG.isDebugEnabled())
LOG.debug("Released {}", networkBuffer);
networkBuffer.recycle();
NetworkBuffer currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();
if (currentBuffer.hasRemaining() && !shutdown && !failed)
throw new IllegalStateException();
currentBuffer.release();
networkBuffer = null;
if (LOG.isDebugEnabled())
LOG.debug("Released {}", currentBuffer);
}
@Override
@ -375,56 +385,36 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
}
}
private class NetworkBuffer implements Callback, Retainable
private class NetworkBuffer extends RetainableByteBuffer implements Callback
{
private final AtomicInteger refCount = new AtomicInteger();
private final ByteBuffer buffer;
private NetworkBuffer()
{
buffer = byteBufferPool.acquire(bufferSize, false); // TODO: make directness customizable
super(byteBufferPool,bufferSize,false);
}
private void put(ByteBuffer source)
{
BufferUtil.append(buffer, source);
}
private boolean hasRemaining()
{
return buffer.hasRemaining();
}
@Override
public void retain()
{
refCount.incrementAndGet();
}
private boolean release()
{
return refCount.decrementAndGet() == 0;
BufferUtil.append(getBuffer(), source);
}
@Override
public void succeeded()
{
if (release())
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", this);
recycle();
}
completed(null);
}
@Override
public void failed(Throwable failure)
{
if (release())
completed(failure);
}
private void completed(Throwable failure)
{
if (release() == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained " + this, failure);
recycle();
}
}
@ -433,16 +423,5 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
{
return InvocationType.NON_BLOCKING;
}
private void recycle()
{
byteBufferPool.release(buffer);
}
@Override
public String toString()
{
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), buffer);
}
}
}

View File

@ -0,0 +1,99 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Retainable;
/**
* A Retainable ByteBuffer.
* <p>Acquires a ByteBuffer from a {@link ByteBufferPool} and maintains a reference count that is
* initially 1, incremented with {@link #retain()} and decremented with {@link #release()}. The buffer
* is released to the pool when the reference count is decremented to 0.</p>
*/
public class RetainableByteBuffer implements Retainable
{
private final ByteBufferPool pool;
private final ByteBuffer buffer;
private final AtomicInteger references;
public RetainableByteBuffer(ByteBufferPool pool, int size)
{
this(pool, size, false);
}
public RetainableByteBuffer(ByteBufferPool pool, int size, boolean direct)
{
this.pool = pool;
this.buffer = pool.acquire(size, direct);
this.references = new AtomicInteger(1);
}
public ByteBuffer getBuffer()
{
return buffer;
}
public int getReferences()
{
return references.get();
}
@Override
public void retain()
{
while (true)
{
int r = references.get();
if (r == 0)
throw new IllegalStateException("released " + this);
if (references.compareAndSet(r, r + 1))
break;
}
}
public int release()
{
int ref = references.decrementAndGet();
if (ref == 0)
pool.release(buffer);
else if (ref < 0)
throw new IllegalStateException("already released " + this);
return ref;
}
public boolean hasRemaining()
{
return buffer.hasRemaining();
}
public boolean isEmpty()
{
return !buffer.hasRemaining();
}
@Override
public String toString()
{
return String.format("%s@%x{%s,r=%d}", getClass().getSimpleName(), hashCode(), BufferUtil.toDetailString(buffer), getReferences());
}
}