Fixes #4971 - Simplify Connection.upgradeFrom()/upgradeTo(). (#5008)

Fixes #4971 - Simplify Connection.upgradeFrom()/upgradeTo().

Now the upgrade-from connection produces a "floating" buffer 
(not belonging to a pool), so that it can release the original buffer.

The upgrade-to connection is free to copy or store this "floating" buffer.

Strengthened ByteBufferPool behavior when releasing non-pooled
ByteBuffers: the buffer is now discarded.

Updated javadocs and all implementations.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-07-06 22:45:03 +02:00 committed by GitHub
parent 43353d0ca9
commit f2c6b67827
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 226 additions and 165 deletions

View File

@ -95,8 +95,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
protected void setInputBuffer(ByteBuffer buffer)
{
if (buffer != null)
producer.setInputBuffer(buffer);
producer.setInputBuffer(buffer);
}
@Override

View File

@ -434,11 +434,13 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
oldConnection.onClose();
oldConnection.getEndPoint().setConnection(newConnection);
if (newConnection instanceof Connection.UpgradeTo)
((Connection.UpgradeTo)newConnection).onUpgradeTo(buffer);
else if (BufferUtil.hasContent(buffer))
throw new IllegalStateException("Cannot upgrade: " + newConnection + " does not implement " + Connection.UpgradeTo.class.getName());
if (BufferUtil.hasContent(buffer))
{
if (newConnection instanceof Connection.UpgradeTo)
((Connection.UpgradeTo)newConnection).onUpgradeTo(buffer);
else
throw new IllegalStateException("Cannot upgrade: " + newConnection + " does not implement " + Connection.UpgradeTo.class.getName());
}
newConnection.onOpen();
}

View File

@ -23,8 +23,11 @@ import java.util.Arrays;
import java.util.Objects;
import java.util.function.IntFunction;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.</p>
@ -35,6 +38,8 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject
public class ArrayByteBufferPool extends AbstractByteBufferPool
{
private static final Logger LOG = Log.getLogger(MappedByteBufferPool.class);
private final int _minCapacity;
private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect;
@ -119,8 +124,18 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
{
if (buffer == null)
return;
int capacity = buffer.capacity();
// Validate that this buffer is from this pool.
if ((capacity % getCapacityFactor()) != 0)
{
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer));
return;
}
boolean direct = buffer.isDirect();
ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), direct, this::newBucket);
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct, this::newBucket);
if (bucket != null)
{
bucket.release(buffer);

View File

@ -96,31 +96,49 @@ public interface Connection extends Closeable
long getCreatedTimeStamp();
/**
* <p>{@link Connection} implementations implement this interface when they
* can upgrade from the protocol they speak (for example HTTP/1.1)
* to a different protocol (e.g. HTTP/2).</p>
*
* @see EndPoint#upgrade(Connection)
* @see UpgradeTo
*/
interface UpgradeFrom
{
/**
* <p>Takes the input buffer from the connection on upgrade.</p>
* <p>This method is used to take any unconsumed input from
* a connection during an upgrade.</p>
* <p>Invoked during an {@link EndPoint#upgrade(Connection) upgrade}
* to produce a buffer containing bytes that have not been consumed by
* this connection, and that must be consumed by the upgrade-to
* connection.</p>
*
* @return A buffer of unconsumed input. The caller must return the buffer
* to the bufferpool when consumed and this connection must not.
* @return a buffer of unconsumed bytes to pass to the upgrade-to connection.
* The buffer does not belong to any pool and should be discarded after
* having consumed its bytes.
* The returned buffer may be null if there are no unconsumed bytes.
*/
ByteBuffer onUpgradeFrom();
}
/**
* <p>{@link Connection} implementations implement this interface when they
* can be upgraded to the protocol they speak (e.g. HTTP/2)
* from a different protocol (e.g. HTTP/1.1).</p>
*/
interface UpgradeTo
{
/**
* <p>Callback method invoked when this connection is upgraded.</p>
* <p>This must be called before {@link #onOpen()}.</p>
* <p>Invoked during an {@link EndPoint#upgrade(Connection) upgrade}
* to receive a buffer containing bytes that have not been consumed by
* the upgrade-from connection, and that must be consumed by this
* connection.</p>
*
* @param prefilled An optional buffer that can contain prefilled data. Typically this
* results from an upgrade of one protocol to the other where the old connection has buffered
* data destined for the new connection. The new connection must take ownership of the buffer
* and is responsible for returning it to the buffer pool
* @param buffer a non-null buffer of unconsumed bytes received from
* the upgrade-from connection.
* The buffer does not belong to any pool and should be discarded after
* having consumed its bytes.
*/
void onUpgradeTo(ByteBuffer prefilled);
void onUpgradeTo(ByteBuffer buffer);
}
/**

View File

@ -31,7 +31,7 @@ import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.Invocable;
/**
* A transport EndPoint
* <p>EndPoint is the abstraction for an I/O channel that transports bytes.</p>
*
* <h3>Asynchronous Methods</h3>
* <p>The asynchronous scheduling methods of {@link EndPoint}
@ -40,76 +40,79 @@ import org.eclipse.jetty.util.thread.Invocable;
* some inefficiencies.</p>
* <p>This class will frequently be used in conjunction with some of the utility
* implementations of {@link Callback}, such as {@link FutureCallback} and
* {@link IteratingCallback}. Examples are:</p>
* {@link IteratingCallback}.</p>
*
* <h3>Blocking Read</h3>
* <p>A FutureCallback can be used to block until an endpoint is ready to be filled
* from:</p>
* <blockquote><pre>
* FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.fillInterested("ContextObj",future);
* ...
* String context = future.get(); // This blocks
* int filled=endpoint.fill(mybuffer);
* </pre></blockquote>
* <h3>Reads</h3>
* <p>A {@link FutureCallback} can be used to block until an endpoint is ready
* to fill bytes - the notification will be emitted by the NIO subsystem:</p>
* <pre>
* FutureCallback callback = new FutureCallback();
* endPoint.fillInterested(callback);
*
* <h3>Dispatched Read</h3>
* <p>By using a different callback, the read can be done asynchronously in its own dispatched thread:</p>
* <blockquote><pre>
* endpoint.fillInterested("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* // Blocks until read to fill bytes.
* callback.get();
*
* // Now bytes can be filled in a ByteBuffer.
* int filled = endPoint.fill(byteBuffer);
* </pre>
*
* <h3>Asynchronous Reads</h3>
* <p>A {@link Callback} can be used to read asynchronously in its own dispatched
* thread:</p>
* <pre>
* endPoint.fillInterested(new Callback()
* {
* public void onCompleted(String context)
* public void onSucceeded()
* {
* int filled=endpoint.fill(mybuffer);
* ...
* executor.execute(() -&gt;
* {
* // Fill bytes in a different thread.
* int filled = endPoint.fill(byteBuffer);
* });
* }
* public void onFailed(Throwable failure)
* {
* endPoint.close();
* }
* public void onFailed(String context,Throwable cause) {...}
* });
* </pre></blockquote>
* <p>The executor callback can also be customized to not dispatch in some circumstances when
* it knows it can use the callback thread and does not need to dispatch.</p>
* </pre>
*
* <h3>Blocking Write</h3>
* <p>The write contract is that the callback complete is not called until all data has been
* written or there is a failure. For blocking this looks like:</p>
* <blockquote><pre>
* FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
* String context = future.get(); // This blocks
* </pre></blockquote>
* <h3>Blocking Writes</h3>
* <p>The write contract is that the callback is completed when all the bytes
* have been written or there is a failure.
* Blocking writes look like this:</p>
* <pre>
* FutureCallback callback = new FutureCallback();
* endpoint.write(callback, headerBuffer, contentBuffer);
*
* <h3>Dispatched Write</h3>
* <p>Note also that multiple buffers may be passed in write so that gather writes
* can be done:</p>
* <blockquote><pre>
* endpoint.write("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* {
* public void onCompleted(String context)
* {
* int filled=endpoint.fill(mybuffer);
* ...
* }
* public void onFailed(String context,Throwable cause) {...}
* },headerBuffer,contentBuffer);
* </pre></blockquote>
* // Blocks until the write succeeds or fails.
* future.get();
* </pre>
* <p>Note also that multiple buffers may be passed in {@link #write(Callback, ByteBuffer...)}
* so that gather writes can be performed for efficiency.</p>
*/
public interface EndPoint extends Closeable
{
/**
* @return The local Inet address to which this <code>EndPoint</code> is bound, or <code>null</code>
* if this <code>EndPoint</code> does not represent a network connection.
* @return The local Inet address to which this {@code EndPoint} is bound, or {@code null}
* if this {@code EndPoint} does not represent a network connection.
*/
InetSocketAddress getLocalAddress();
/**
* @return The remote Inet address to which this <code>EndPoint</code> is bound, or <code>null</code>
* if this <code>EndPoint</code> does not represent a network connection.
* @return The remote Inet address to which this {@code EndPoint} is bound, or {@code null}
* if this {@code EndPoint} does not represent a network connection.
*/
InetSocketAddress getRemoteAddress();
/**
* @return whether this EndPoint is open
*/
boolean isOpen();
/**
* @return the epoch time in milliseconds when this EndPoint was created
*/
long getCreatedTimeStamp();
/**
@ -157,7 +160,7 @@ public interface EndPoint extends Closeable
*
* @param buffer The buffer to fill. The position and limit are modified during the fill. After the
* operation, the position is unchanged and the limit is increased to reflect the new data filled.
* @return an <code>int</code> value indicating the number of bytes
* @return an {@code int} value indicating the number of bytes
* filled or -1 if EOF is read or the input is shutdown.
* @throws IOException if the endpoint is closed.
*/
@ -232,27 +235,27 @@ public interface EndPoint extends Closeable
void write(Callback callback, ByteBuffer... buffers) throws WritePendingException;
/**
* @return the {@link Connection} associated with this {@link EndPoint}
* @return the {@link Connection} associated with this EndPoint
* @see #setConnection(Connection)
*/
Connection getConnection();
/**
* @param connection the {@link Connection} associated with this {@link EndPoint}
* @param connection the {@link Connection} associated with this EndPoint
* @see #getConnection()
* @see #upgrade(Connection)
*/
void setConnection(Connection connection);
/**
* <p>Callback method invoked when this {@link EndPoint} is opened.</p>
* <p>Callback method invoked when this EndPoint is opened.</p>
*
* @see #onClose()
*/
void onOpen();
/**
* <p>Callback method invoked when this {@link EndPoint} is close.</p>
* <p>Callback method invoked when this EndPoint is close.</p>
*
* @see #onOpen()
*/
@ -266,13 +269,18 @@ public interface EndPoint extends Closeable
boolean isOptimizedForDirectBuffers();
/**
* Upgrade connections.
* Close the old connection, update the endpoint and open the new connection.
* If the oldConnection is an instance of {@link Connection.UpgradeFrom} then
* a prefilled buffer is requested and passed to the newConnection if it is an instance
* of {@link Connection.UpgradeTo}
* <p>Upgrades this EndPoint from the current connection to the given new connection.</p>
* <p>Closes the current connection, links this EndPoint to the new connection and
* then opens the new connection.</p>
* <p>If the current connection is an instance of {@link Connection.UpgradeFrom} then
* a buffer of unconsumed bytes is requested.
* If the buffer of unconsumed bytes is non-null and non-empty, then the new
* connection is tested: if it is an instance of {@link Connection.UpgradeTo}, then
* the unconsumed buffer is passed to the new connection; otherwise, an exception
* is thrown since there are unconsumed bytes that cannot be consumed by the new
* connection.</p>
*
* @param newConnection The connection to upgrade to
* @param newConnection the connection to upgrade to
*/
void upgrade(Connection newConnection);
}

View File

@ -92,6 +92,8 @@ public abstract class FillInterest
/**
* Call to signal that a read is now possible.
*
* @return whether the callback was notified that a read is now possible
*/
public boolean fillable()
{

View File

@ -28,6 +28,8 @@ import java.util.function.Function;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in a Map.</p>
@ -38,6 +40,8 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject
public class MappedByteBufferPool extends AbstractByteBufferPool
{
private static final Logger LOG = Log.getLogger(MappedByteBufferPool.class);
private final ConcurrentMap<Integer, Bucket> _directBuffers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Bucket> _heapBuffers = new ConcurrentHashMap<>();
private final Function<Integer, Bucket> _newBucket;
@ -127,7 +131,12 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
int capacity = buffer.capacity();
// Validate that this buffer is from this pool.
assert ((capacity % getCapacityFactor()) == 0);
if ((capacity % getCapacityFactor()) != 0)
{
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer));
return;
}
int b = bucketFor(capacity);
boolean direct = buffer.isDirect();

View File

@ -346,11 +346,8 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
if (BufferUtil.hasContent(buffer))
{
acquireEncryptedInput();
BufferUtil.append(_encryptedInput, buffer);
}
acquireEncryptedInput();
BufferUtil.append(_encryptedInput, buffer);
}
@Override

View File

@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Objects;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.eclipse.jetty.util.StringUtil;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
@ -150,6 +151,17 @@ public class ArrayByteBufferPoolTest
}
}
@Test
public void testReleaseNonPooledBuffer()
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool();
// Release a few small non-pool buffers
bufferPool.release(ByteBuffer.wrap(StringUtil.getUtf8Bytes("Hello")));
assertEquals(0, bufferPool.getHeapByteBufferCount());
}
@Test
public void testMaxQueue()
{

View File

@ -35,7 +35,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class MappedByteBufferPoolTest
{
@ -95,34 +94,15 @@ public class MappedByteBufferPoolTest
assertTrue(buckets.isEmpty());
}
/**
* In a scenario where MappedByteBufferPool is being used improperly,
* such as releasing a buffer that wasn't created/acquired by the
* MappedByteBufferPool, an assertion is tested for.
*/
@Test
public void testReleaseAssertion()
public void testReleaseNonPooledBuffer()
{
int factor = 1024;
MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor);
MappedByteBufferPool bufferPool = new MappedByteBufferPool();
try
{
// Release a few small non-pool buffers
bufferPool.release(ByteBuffer.wrap(StringUtil.getUtf8Bytes("Hello")));
// Release a few small non-pool buffers
bufferPool.release(ByteBuffer.wrap(StringUtil.getUtf8Bytes("Hello")));
/* NOTES:
*
* 1) This test will pass on command line maven build, as its surefire setup uses "-ea" already.
* 2) In Eclipse, goto the "Run Configuration" for this test case.
* Select the "Arguments" tab, and make sure "-ea" is present in the text box titled "VM arguments"
*/
fail("Expected java.lang.AssertionError, do you have '-ea' JVM command line option enabled?");
}
catch (java.lang.AssertionError e)
{
// Expected path.
}
assertEquals(0, bufferPool.getHeapByteBufferCount());
}
@Test

View File

@ -50,7 +50,6 @@ import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HostPort;
import org.eclipse.jetty.util.Promise;
@ -613,19 +612,27 @@ public class ConnectHandler extends HandlerWrapper
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
this.buffer = buffer == null ? BufferUtil.EMPTY_BUFFER : buffer;
this.buffer = buffer;
}
@Override
public void onOpen()
{
super.onOpen();
final int remaining = buffer.remaining();
if (buffer == null)
{
fillInterested();
return;
}
int remaining = buffer.remaining();
write(getConnection().getEndPoint(), buffer, new Callback()
{
@Override
public void succeeded()
{
buffer = null;
if (LOG.isDebugEnabled())
LOG.debug("{} wrote initial {} bytes to server", DownstreamConnection.this, remaining);
fillInterested();
@ -634,6 +641,7 @@ public class ConnectHandler extends HandlerWrapper
@Override
public void failed(Throwable x)
{
buffer = null;
if (LOG.isDebugEnabled())
LOG.debug(this + " failed to write initial " + remaining + " bytes to server", x);
close();

View File

@ -153,18 +153,25 @@ public class DetectorConnectionFactory extends AbstractConnectionFactory impleme
}
@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
LOG.debug("Detector {} copying prefilled buffer {}", getProtocol(), BufferUtil.toDetailString(prefilled));
if (BufferUtil.hasContent(prefilled))
BufferUtil.append(_buffer, prefilled);
LOG.debug("Detector {} copying unconsumed buffer {}", getProtocol(), BufferUtil.toDetailString(buffer));
BufferUtil.append(_buffer, buffer);
}
@Override
public ByteBuffer onUpgradeFrom()
{
return _buffer;
if (_buffer.hasRemaining())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_buffer.remaining());
unconsumed.put(_buffer);
unconsumed.flip();
_connector.getByteBufferPool().release(_buffer);
return unconsumed;
}
return null;
}
@Override

View File

@ -192,9 +192,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
if (BufferUtil.hasContent(_requestBuffer))
{
ByteBuffer buffer = _requestBuffer;
_requestBuffer = null;
return buffer;
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_requestBuffer.remaining());
unconsumed.put(_requestBuffer);
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
}
return null;
}
@ -202,8 +204,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
if (BufferUtil.hasContent(buffer))
BufferUtil.append(getRequestBuffer(), buffer);
BufferUtil.append(getRequestBuffer(), buffer);
}
@Override

View File

@ -234,16 +234,23 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory
@Override
public ByteBuffer onUpgradeFrom()
{
return _buffer;
if (_buffer.hasRemaining())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_buffer.remaining());
unconsumed.put(_buffer);
unconsumed.flip();
_connector.getByteBufferPool().release(_buffer);
return unconsumed;
}
return null;
}
@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
LOG.debug("Proxy v1 copying prefilled buffer {}", BufferUtil.toDetailString(prefilled));
if (BufferUtil.hasContent(prefilled))
BufferUtil.append(_buffer, prefilled);
LOG.debug("Proxy v1 copying unconsumed buffer {}", BufferUtil.toDetailString(buffer));
BufferUtil.append(_buffer, buffer);
}
/**
@ -442,12 +449,11 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory
}
@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
LOG.debug("Proxy v2 copying prefilled buffer {}", BufferUtil.toDetailString(prefilled));
if (BufferUtil.hasContent(prefilled))
BufferUtil.append(_buffer, prefilled);
LOG.debug("Proxy v2 copying unconsumed buffer {}", BufferUtil.toDetailString(buffer));
BufferUtil.append(_buffer, buffer);
}
@Override
@ -540,7 +546,15 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory
@Override
public ByteBuffer onUpgradeFrom()
{
return _buffer;
if (_buffer.hasRemaining())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_buffer.remaining());
unconsumed.put(_buffer);
unconsumed.flip();
_connector.getByteBufferPool().release(_buffer);
return unconsumed;
}
return null;
}
private void parseBodyAndUpgrade() throws IOException

View File

@ -145,7 +145,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final LongAdder bytesIn = new LongAdder();
private WebSocketSession session;
private List<ExtensionConfig> extensions = new ArrayList<>();
private ByteBuffer prefillBuffer;
private ByteBuffer initialBuffer;
private Stats stats = new Stats();
private CloseInfo fatalCloseInfo;
@ -255,13 +255,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
if (connectionState.opened())
{
if (BufferUtil.hasContent(prefillBuffer))
if (BufferUtil.hasContent(initialBuffer))
{
if (LOG.isDebugEnabled())
{
LOG.debug("Parsing Upgrade prefill buffer ({} remaining)", prefillBuffer.remaining());
}
parser.parse(prefillBuffer);
LOG.debug("Parsing upgrade initial buffer ({} remaining)", initialBuffer.remaining());
parser.parse(initialBuffer);
initialBuffer = null;
}
fillInterested();
return true;
@ -545,15 +544,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* be processed by the websocket parser before starting
* to read bytes from the connection
*
* @param prefilled the bytes of prefilled content encountered during upgrade
* @param initialBuffer the bytes of unconsumed content encountered during upgrade
*/
protected void setInitialBuffer(ByteBuffer prefilled)
protected void setInitialBuffer(ByteBuffer initialBuffer)
{
if (LOG.isDebugEnabled())
{
LOG.debug("set Initial Buffer - {}", BufferUtil.toDetailString(prefilled));
}
prefillBuffer = prefilled;
LOG.debug("Set initial buffer - {}", BufferUtil.toDetailString(initialBuffer));
this.initialBuffer = initialBuffer;
}
/**
@ -646,14 +643,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* to read bytes from the connection
*/
@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
{
LOG.debug("onUpgradeTo({})", BufferUtil.toDetailString(prefilled));
}
setInitialBuffer(prefilled);
LOG.debug("onUpgradeTo({})", BufferUtil.toDetailString(buffer));
setInitialBuffer(buffer);
}
@Override

View File

@ -173,9 +173,9 @@ public class BlockheadConnection extends AbstractConnection implements Connectio
}
@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
setInitialBuffer(prefilled);
setInitialBuffer(buffer);
}
@Override
@ -259,20 +259,15 @@ public class BlockheadConnection extends AbstractConnection implements Connectio
* be processed by the websocket parser before starting
* to read bytes from the connection
*
* @param prefilled the bytes of prefilled content encountered during upgrade
* @param initialBuffer the bytes of unconsumed content encountered during upgrade
*/
protected void setInitialBuffer(ByteBuffer prefilled)
protected void setInitialBuffer(ByteBuffer initialBuffer)
{
if (log.isDebugEnabled())
if (BufferUtil.hasContent(initialBuffer))
{
log.debug("set Initial Buffer - {}", BufferUtil.toDetailString(prefilled));
}
if ((prefilled != null) && (prefilled.hasRemaining()))
{
networkBuffer = bufferPool.acquire(prefilled.remaining(), true);
networkBuffer = bufferPool.acquire(initialBuffer.remaining(), true);
BufferUtil.clearToFill(networkBuffer);
BufferUtil.put(prefilled, networkBuffer);
BufferUtil.put(initialBuffer, networkBuffer);
BufferUtil.flipToFlush(networkBuffer, 0);
}
}