Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-10.0.x

This commit is contained in:
Greg Wilkins 2018-11-22 12:14:49 +01:00
commit 0462212446
2 changed files with 90 additions and 112 deletions

View File

@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http2.frames.DataFrame;
@ -31,10 +30,10 @@ import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Retainable;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -217,8 +216,8 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private void setInputBuffer(ByteBuffer byteBuffer)
{
if (networkBuffer == null)
networkBuffer = acquireNetworkBuffer();
acquireNetworkBuffer();
// TODO handle buffer overflow?
networkBuffer.put(byteBuffer);
}
@ -234,93 +233,104 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
if (isFillInterested() || shutdown || failed)
return null;
if (networkBuffer == null)
networkBuffer = acquireNetworkBuffer();
boolean parse = networkBuffer.hasRemaining();
while (true)
boolean interested = false;
acquireNetworkBuffer();
try
{
if (parse)
boolean parse = networkBuffer.hasRemaining();
while (true)
{
boolean released;
networkBuffer.retain();
try
if (parse)
{
while (networkBuffer.hasRemaining())
{
parser.parse(networkBuffer.buffer);
parser.parse(networkBuffer.getBuffer());
if (failed)
return null;
}
}
finally
{
released = networkBuffer.release();
if (failed && released)
releaseNetworkBuffer();
task = pollTask();
if (LOG.isDebugEnabled())
LOG.debug("Dequeued new task {}", task);
if (task != null)
return task;
// If more references than 1 (ie not just us), don't refill into buffer and risk compaction.
if (networkBuffer.getReferences() > 1)
reacquireNetworkBuffer();
}
task = pollTask();
// Here we know that this.networkBuffer is not retained by
// application code: either it has been released, or it's a new one.
int filled = fill(getEndPoint(), networkBuffer.getBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Dequeued new task {}", task);
if (task != null)
LOG.debug("Filled {} bytes in {}", filled, networkBuffer);
if (filled > 0)
{
if (released)
releaseNetworkBuffer();
else
networkBuffer = null;
return task;
bytesIn.addAndGet(filled);
parse = true;
}
else if (filled == 0)
{
interested = true;
return null;
}
else
{
if (!released)
networkBuffer = acquireNetworkBuffer();
shutdown = true;
session.onShutdown();
return null;
}
}
// Here we know that this.buffer is not retained:
// either it has been released, or it's a new one.
int filled = fill(getEndPoint(), networkBuffer.buffer);
if (LOG.isDebugEnabled())
LOG.debug("Filled {} bytes in {}", filled, networkBuffer);
if (filled > 0)
{
bytesIn.addAndGet(filled);
parse = true;
}
else if (filled == 0)
{
releaseNetworkBuffer();
}
finally
{
releaseNetworkBuffer();
if (interested)
getEndPoint().fillInterested(fillableCallback);
return null;
}
else
{
releaseNetworkBuffer();
shutdown = true;
session.onShutdown();
return null;
}
}
}
private NetworkBuffer acquireNetworkBuffer()
private void acquireNetworkBuffer()
{
NetworkBuffer networkBuffer = new NetworkBuffer();
if (networkBuffer == null)
{
networkBuffer = new NetworkBuffer();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", networkBuffer);
}
}
private void reacquireNetworkBuffer()
{
NetworkBuffer currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();
if (currentBuffer.getBuffer().hasRemaining())
throw new IllegalStateException();
currentBuffer.release();
networkBuffer = new NetworkBuffer();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", networkBuffer);
return networkBuffer;
LOG.debug("Reacquired {}<-{}", currentBuffer, networkBuffer);
}
private void releaseNetworkBuffer()
{
if (LOG.isDebugEnabled())
LOG.debug("Released {}", networkBuffer);
networkBuffer.recycle();
NetworkBuffer currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();
if (currentBuffer.hasRemaining() && !shutdown && !failed)
throw new IllegalStateException();
currentBuffer.release();
networkBuffer = null;
if (LOG.isDebugEnabled())
LOG.debug("Released {}", currentBuffer);
}
@Override
@ -375,56 +385,36 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
}
}
private class NetworkBuffer implements Callback, Retainable
private class NetworkBuffer extends RetainableByteBuffer implements Callback
{
private final AtomicInteger refCount = new AtomicInteger();
private final ByteBuffer buffer;
private NetworkBuffer()
{
buffer = byteBufferPool.acquire(bufferSize, false); // TODO: make directness customizable
super(byteBufferPool,bufferSize,false);
}
private void put(ByteBuffer source)
{
BufferUtil.append(buffer, source);
}
private boolean hasRemaining()
{
return buffer.hasRemaining();
}
@Override
public void retain()
{
refCount.incrementAndGet();
}
private boolean release()
{
return refCount.decrementAndGet() == 0;
BufferUtil.append(getBuffer(), source);
}
@Override
public void succeeded()
{
if (release())
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", this);
recycle();
}
completed(null);
}
@Override
public void failed(Throwable failure)
{
if (release())
completed(failure);
}
private void completed(Throwable failure)
{
if (release() == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained " + this, failure);
recycle();
}
}
@ -433,16 +423,5 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
{
return InvocationType.NON_BLOCKING;
}
private void recycle()
{
byteBufferPool.release(buffer);
}
@Override
public String toString()
{
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), buffer);
}
}
}

View File

@ -18,12 +18,12 @@
package org.eclipse.jetty.io;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Retainable;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Retainable;
/**
* A Retainable ByteBuffer.
* <p>Acquires a ByteBuffer from a {@link ByteBufferPool} and maintains a reference count that is
@ -65,7 +65,7 @@ public class RetainableByteBuffer implements Retainable
{
int r = references.get();
if (r == 0)
throw new IllegalStateException("released");
throw new IllegalStateException("released " + this);
if (references.compareAndSet(r, r + 1))
break;
}
@ -76,9 +76,8 @@ public class RetainableByteBuffer implements Retainable
int ref = references.decrementAndGet();
if (ref == 0)
pool.release(buffer);
else if (ref < 0 )
throw new IllegalStateException("already released");
else if (ref < 0)
throw new IllegalStateException("already released " + this);
return ref;
}
@ -95,6 +94,6 @@ public class RetainableByteBuffer implements Retainable
@Override
public String toString()
{
return BufferUtil.toDetailString(buffer) + ":r=" + getReferences();
return String.format("%s@%x{%s,r=%d}", getClass().getSimpleName(), hashCode(), BufferUtil.toDetailString(buffer), getReferences());
}
}