ARTEMIS-928 Changing Netty and InVM to copy buffers, and retain them on the Netty Polls.

(cherry picked from commit 3347a4fd27)
This commit is contained in:
Will Reichert 2017-01-04 10:14:16 -06:00 committed by Clebert Suconic
parent aaf09262ff
commit 036933a4a4
22 changed files with 171 additions and 14 deletions

View File

@ -1149,4 +1149,10 @@ public interface ActiveMQBuffer extends DataInput {
* @return A converted NIO Buffer
*/
ByteBuffer toByteBuffer(int index, int length);
/**
* Release any underlying resources held by this buffer
*/
void release();
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core;
import java.nio.ByteBuffer;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@ -26,6 +27,9 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
*/
public final class ActiveMQBuffers {
private static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator();
/**
* Creates a <em>self-expanding</em> ActiveMQBuffer with the given initial size
*
@ -36,6 +40,11 @@ public final class ActiveMQBuffers {
return new ChannelBufferWrapper(Unpooled.buffer(size));
}
public static ActiveMQBuffer pooledBuffer(final int size) {
return new ChannelBufferWrapper(ALLOCATOR.heapBuffer(size),true, true);
}
/**
* Creates a <em>self-expanding</em> ActiveMQBuffer filled with the given byte array
*

View File

@ -31,7 +31,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
protected ByteBuf buffer; // NO_UCD (use final)
private final boolean releasable;
private final boolean isPooled;
public static ByteBuf unwrap(ByteBuf buffer) {
ByteBuf parent;
while ((parent = buffer.unwrap()) != null && parent != buffer) { // this last part is just in case the semantic
@ -45,14 +45,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
public ChannelBufferWrapper(final ByteBuf buffer) {
this(buffer, false);
}
public ChannelBufferWrapper(final ByteBuf buffer, boolean releasable) {
this(buffer, releasable, false);
}
public ChannelBufferWrapper(final ByteBuf buffer, boolean releasable, boolean pooled) {
if (!releasable) {
this.buffer = Unpooled.unreleasableBuffer(buffer);
} else {
this.buffer = buffer;
}
this.releasable = releasable;
this.isPooled = pooled;
}
@Override
@ -398,7 +402,19 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
@Override
public ActiveMQBuffer readSlice(final int length) {
return new ChannelBufferWrapper(buffer.readSlice(length), releasable);
if ( isPooled ) {
ByteBuf fromBuffer = buffer.readSlice(length);
ByteBuf newNettyBuffer = Unpooled.buffer(fromBuffer.capacity());
int read = fromBuffer.readerIndex();
int writ = fromBuffer.writerIndex();
fromBuffer.readerIndex(0);
fromBuffer.readBytes(newNettyBuffer,0,writ);
newNettyBuffer.setIndex(read,writ);
ActiveMQBuffer returnBuffer = new ChannelBufferWrapper(newNettyBuffer,releasable,false);
returnBuffer.setIndex(read,writ);
return returnBuffer;
}
return new ChannelBufferWrapper(buffer.readSlice(length), releasable, isPooled);
}
@Override
@ -522,6 +538,13 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
return buffer.nioBuffer(index, length);
}
@Override
public void release() {
if ( this.isPooled ) {
buffer.release();
}
}
@Override
public boolean writable() {
return buffer.isWritable();

View File

@ -202,6 +202,11 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
}
@Override
public void release() {
//no-op
}
@Override
public int readerIndex() {
return 0;

View File

@ -524,6 +524,11 @@ public class LargeMessageControllerImpl implements LargeMessageController {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
@Override
public void release() {
//no-op
}
@Override
public int readerIndex() {
return (int) readerIndex;

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -76,9 +77,9 @@ public abstract class MessageImpl implements MessageInternal {
protected byte priority;
protected ActiveMQBuffer buffer;
protected volatile ActiveMQBuffer buffer;
protected ResetLimitWrappedActiveMQBuffer bodyBuffer;
protected volatile ResetLimitWrappedActiveMQBuffer bodyBuffer;
protected volatile boolean bufferValid;
@ -434,12 +435,16 @@ public abstract class MessageImpl implements MessageInternal {
@Override
public void decodeFromBuffer(final ActiveMQBuffer buffer) {
this.buffer = buffer;
this.buffer = copyMessageBuffer(buffer);
decode();
//synchronize indexes
buffer.setIndex(this.buffer.readerIndex(),this.buffer.writerIndex());
// Setting up the BodyBuffer based on endOfBodyPosition set from decode
ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, null);
ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, this.buffer, null);
tmpbodyBuffer.readerIndex(BODY_OFFSET);
tmpbodyBuffer.writerIndex(endOfBodyPosition);
// only set this after the writer and reader is set,
@ -449,6 +454,30 @@ public abstract class MessageImpl implements MessageInternal {
}
private ActiveMQBuffer copyMessageBuffer(ActiveMQBuffer buffer) {
ActiveMQBuffer copiedBuffer;
ByteBuf newNettyBuffer = Unpooled.buffer( buffer.byteBuf().capacity() );
int read = buffer.byteBuf().readerIndex();
int writ = buffer.byteBuf().writerIndex();
int readArt = buffer.readerIndex();
int writArt = buffer.writerIndex();
buffer.byteBuf().readerIndex( 0 );
buffer.byteBuf().readBytes( newNettyBuffer, 0, buffer.byteBuf().writerIndex() );
buffer.byteBuf().setIndex( read, writ );
newNettyBuffer.setIndex( read, writ );
copiedBuffer = new ChannelBufferWrapper( newNettyBuffer );
buffer.setIndex( readArt, writArt );
copiedBuffer.setIndex( readArt, writArt );
return copiedBuffer;
}
@Override
public void bodyChanged() {
bufferValid = false;

View File

@ -63,6 +63,15 @@ public interface Packet {
*/
ActiveMQBuffer encode(RemotingConnection connection);
/**
* Encodes the packet and returns a {@link ActiveMQBuffer} containing the data
*
* @param connection the connection
* @param usePooled if the returned buffer should be pooled or unpooled
* @return the buffer to encode to
*/
ActiveMQBuffer encode(RemotingConnection connection, boolean usePooled);
/**
* decodes the buffer into this packet
*

View File

@ -877,6 +877,8 @@ public class ActiveMQSessionContext extends SessionContext {
ActiveMQBuffer buffer = packet.encode(this.getCoreConnection());
conn.write(buffer, false, false);
buffer.release();
}
}

View File

@ -301,6 +301,8 @@ public final class ChannelImpl implements Channel {
// buffer is full, preventing any incoming buffers being handled and blocking failover
connection.getTransportConnection().write(buffer, flush, batch);
buffer.release();
return true;
}
}
@ -408,6 +410,7 @@ public final class ChannelImpl implements Channel {
}
} finally {
lock.unlock();
buffer.release();
}
return response;
@ -630,6 +633,9 @@ public final class ChannelImpl implements Channel {
final ActiveMQBuffer buffer = packet.encode(connection);
connection.getTransportConnection().write(buffer, false, false);
buffer.release();
}
private void addResendPacket(Packet packet) {

View File

@ -274,7 +274,13 @@ public class PacketImpl implements Packet {
@Override
public ActiveMQBuffer encode(final RemotingConnection connection) {
ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
return encode(connection,true);
}
@Override
public ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) {
ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled);
// The standard header fields

View File

@ -56,7 +56,7 @@ public class SessionReceiveMessage extends MessagePacket {
public ActiveMQBuffer encode(final RemotingConnection connection) {
ActiveMQBuffer buffer = message.getEncodedBuffer();
ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex());
ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex(), true);
bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());

View File

@ -68,7 +68,7 @@ public class SessionSendMessage extends MessagePacket {
// this is for unit tests only
bufferWrite = buffer.copy(0, buffer.capacity());
} else {
bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1); // 1 for the requireResponse
bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse
}
bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());

View File

@ -210,6 +210,11 @@ public class NettyConnection implements Connection {
@Override
public ActiveMQBuffer createTransportBuffer(final int size) {
return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
}

View File

@ -178,6 +178,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return transportConnection.createTransportBuffer(size);
}
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
return transportConnection.createTransportBuffer(size, pooled);
}
@Override
public Connection getTransportConnection() {
return transportConnection;

View File

@ -120,6 +120,8 @@ public interface RemotingConnection extends BufferHandler {
*/
ActiveMQBuffer createTransportBuffer(int size);
ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
/**
* called when the underlying connection fails.
*

View File

@ -35,6 +35,8 @@ public interface Connection {
*/
ActiveMQBuffer createTransportBuffer(int size);
ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
RemotingConnection getProtocolConnection();
void setProtocolConnection(RemotingConnection connection);

View File

@ -765,5 +765,10 @@ public class TestConversions extends Assert {
public ByteBuffer toByteBuffer(int index, int length) {
return null;
}
@Override
public void release() {
//no-op
}
}
}

View File

@ -132,6 +132,11 @@ public class MQTTConnection implements RemotingConnection {
@Override
public ActiveMQBuffer createTransportBuffer(int size) {
return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
return transportConnection.createTransportBuffer(size);
}

View File

@ -265,6 +265,11 @@ public final class StompConnection implements RemotingConnection {
@Override
public ActiveMQBuffer createTransportBuffer(int size) {
return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
return ActiveMQBuffers.dynamicBuffer(size);
}

View File

@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
import org.apache.activemq.artemis.core.server.cluster.qourum.Vote;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class QuorumVoteMessage extends PacketImpl {
@ -40,6 +41,11 @@ public class QuorumVoteMessage extends PacketImpl {
this.vote = vote;
}
@Override
public ActiveMQBuffer encode(final RemotingConnection connection) {
return encode(connection,false);
}
@Override
public void encodeRest(ActiveMQBuffer buffer) {
super.encodeRest(buffer);

View File

@ -146,7 +146,16 @@ public class InVMConnection implements Connection {
@Override
public ActiveMQBuffer createTransportBuffer(final int size) {
return ActiveMQBuffers.dynamicBuffer(size);
return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
if ( pooled ) {
return ActiveMQBuffers.pooledBuffer( size );
} else {
return ActiveMQBuffers.dynamicBuffer( size );
}
}
@Override
@ -173,9 +182,13 @@ public class InVMConnection implements Connection {
final boolean flush,
final boolean batch,
final ChannelFutureListener futureListener) {
final ActiveMQBuffer copied = buffer.copy(0, buffer.capacity());
copied.setIndex(buffer.readerIndex(), buffer.writerIndex());
final ActiveMQBuffer copied = ActiveMQBuffers.pooledBuffer(buffer.capacity());
int read = buffer.readerIndex();
int writ = buffer.writerIndex();
copied.writeBytes(buffer,read,writ - read);
copied.setIndex(read,writ);
buffer.setIndex(read,writ);
try {
executor.execute(new Runnable() {
@ -201,6 +214,10 @@ public class InVMConnection implements Connection {
if (logger.isTraceEnabled()) {
logger.trace(InVMConnection.this + "::packet sent done");
}
copied.release();
// if ( copied.byteBuf().refCnt() > 0 ) {
// copied.release();
// }
}
}
});

View File

@ -286,13 +286,18 @@ public class MessageImplTest extends ActiveMQTestBase {
}
for (int i = 0; i < RUNS; i++) {
ActiveMQBuffer buf = null;
try {
SessionSendMessage ssm = new SessionSendMessage(msg);
ActiveMQBuffer buf = ssm.encode(null);
buf = ssm.encode(null);
simulateRead(buf);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
} finally {
if ( buf != null ) {
buf.release();
}
}
}
}