ARTEMIS-928 Changing Netty and InVM to copy buffers, and retain them on the Netty Polls.

This commit is contained in:
Will Reichert 2017-01-04 10:14:16 -06:00 committed by Clebert Suconic
parent 7a7f335271
commit 3347a4fd27
22 changed files with 171 additions and 14 deletions

View File

@ -1149,4 +1149,10 @@ public interface ActiveMQBuffer extends DataInput {
* @return A converted NIO Buffer * @return A converted NIO Buffer
*/ */
ByteBuffer toByteBuffer(int index, int length); ByteBuffer toByteBuffer(int index, int length);
/**
* Release any underlying resources held by this buffer
*/
void release();
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@ -26,6 +27,9 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
*/ */
public final class ActiveMQBuffers { public final class ActiveMQBuffers {
private static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator();
/** /**
* Creates a <em>self-expanding</em> ActiveMQBuffer with the given initial size * Creates a <em>self-expanding</em> ActiveMQBuffer with the given initial size
* *
@ -36,6 +40,11 @@ public final class ActiveMQBuffers {
return new ChannelBufferWrapper(Unpooled.buffer(size)); return new ChannelBufferWrapper(Unpooled.buffer(size));
} }
public static ActiveMQBuffer pooledBuffer(final int size) {
return new ChannelBufferWrapper(ALLOCATOR.heapBuffer(size),true, true);
}
/** /**
* Creates a <em>self-expanding</em> ActiveMQBuffer filled with the given byte array * Creates a <em>self-expanding</em> ActiveMQBuffer filled with the given byte array
* *

View File

@ -31,7 +31,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
protected ByteBuf buffer; // NO_UCD (use final) protected ByteBuf buffer; // NO_UCD (use final)
private final boolean releasable; private final boolean releasable;
private final boolean isPooled;
public static ByteBuf unwrap(ByteBuf buffer) { public static ByteBuf unwrap(ByteBuf buffer) {
ByteBuf parent; ByteBuf parent;
while ((parent = buffer.unwrap()) != null && parent != buffer) { // this last part is just in case the semantic while ((parent = buffer.unwrap()) != null && parent != buffer) { // this last part is just in case the semantic
@ -45,14 +45,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
public ChannelBufferWrapper(final ByteBuf buffer) { public ChannelBufferWrapper(final ByteBuf buffer) {
this(buffer, false); this(buffer, false);
} }
public ChannelBufferWrapper(final ByteBuf buffer, boolean releasable) { public ChannelBufferWrapper(final ByteBuf buffer, boolean releasable) {
this(buffer, releasable, false);
}
public ChannelBufferWrapper(final ByteBuf buffer, boolean releasable, boolean pooled) {
if (!releasable) { if (!releasable) {
this.buffer = Unpooled.unreleasableBuffer(buffer); this.buffer = Unpooled.unreleasableBuffer(buffer);
} else { } else {
this.buffer = buffer; this.buffer = buffer;
} }
this.releasable = releasable; this.releasable = releasable;
this.isPooled = pooled;
} }
@Override @Override
@ -398,7 +402,19 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
@Override @Override
public ActiveMQBuffer readSlice(final int length) { public ActiveMQBuffer readSlice(final int length) {
return new ChannelBufferWrapper(buffer.readSlice(length), releasable); if ( isPooled ) {
ByteBuf fromBuffer = buffer.readSlice(length);
ByteBuf newNettyBuffer = Unpooled.buffer(fromBuffer.capacity());
int read = fromBuffer.readerIndex();
int writ = fromBuffer.writerIndex();
fromBuffer.readerIndex(0);
fromBuffer.readBytes(newNettyBuffer,0,writ);
newNettyBuffer.setIndex(read,writ);
ActiveMQBuffer returnBuffer = new ChannelBufferWrapper(newNettyBuffer,releasable,false);
returnBuffer.setIndex(read,writ);
return returnBuffer;
}
return new ChannelBufferWrapper(buffer.readSlice(length), releasable, isPooled);
} }
@Override @Override
@ -522,6 +538,13 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
return buffer.nioBuffer(index, length); return buffer.nioBuffer(index, length);
} }
@Override
public void release() {
if ( this.isPooled ) {
buffer.release();
}
}
@Override @Override
public boolean writable() { public boolean writable() {
return buffer.isWritable(); return buffer.isWritable();

View File

@ -202,6 +202,11 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
} }
@Override
public void release() {
//no-op
}
@Override @Override
public int readerIndex() { public int readerIndex() {
return 0; return 0;

View File

@ -524,6 +524,11 @@ public class LargeMessageControllerImpl implements LargeMessageController {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
} }
@Override
public void release() {
//no-op
}
@Override @Override
public int readerIndex() { public int readerIndex() {
return (int) readerIndex; return (int) readerIndex;

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -76,9 +77,9 @@ public abstract class MessageImpl implements MessageInternal {
protected byte priority; protected byte priority;
protected ActiveMQBuffer buffer; protected volatile ActiveMQBuffer buffer;
protected ResetLimitWrappedActiveMQBuffer bodyBuffer; protected volatile ResetLimitWrappedActiveMQBuffer bodyBuffer;
protected volatile boolean bufferValid; protected volatile boolean bufferValid;
@ -434,12 +435,16 @@ public abstract class MessageImpl implements MessageInternal {
@Override @Override
public void decodeFromBuffer(final ActiveMQBuffer buffer) { public void decodeFromBuffer(final ActiveMQBuffer buffer) {
this.buffer = buffer;
this.buffer = copyMessageBuffer(buffer);
decode(); decode();
//synchronize indexes
buffer.setIndex(this.buffer.readerIndex(),this.buffer.writerIndex());
// Setting up the BodyBuffer based on endOfBodyPosition set from decode // Setting up the BodyBuffer based on endOfBodyPosition set from decode
ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, null); ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, this.buffer, null);
tmpbodyBuffer.readerIndex(BODY_OFFSET); tmpbodyBuffer.readerIndex(BODY_OFFSET);
tmpbodyBuffer.writerIndex(endOfBodyPosition); tmpbodyBuffer.writerIndex(endOfBodyPosition);
// only set this after the writer and reader is set, // only set this after the writer and reader is set,
@ -449,6 +454,30 @@ public abstract class MessageImpl implements MessageInternal {
} }
private ActiveMQBuffer copyMessageBuffer(ActiveMQBuffer buffer) {
ActiveMQBuffer copiedBuffer;
ByteBuf newNettyBuffer = Unpooled.buffer( buffer.byteBuf().capacity() );
int read = buffer.byteBuf().readerIndex();
int writ = buffer.byteBuf().writerIndex();
int readArt = buffer.readerIndex();
int writArt = buffer.writerIndex();
buffer.byteBuf().readerIndex( 0 );
buffer.byteBuf().readBytes( newNettyBuffer, 0, buffer.byteBuf().writerIndex() );
buffer.byteBuf().setIndex( read, writ );
newNettyBuffer.setIndex( read, writ );
copiedBuffer = new ChannelBufferWrapper( newNettyBuffer );
buffer.setIndex( readArt, writArt );
copiedBuffer.setIndex( readArt, writArt );
return copiedBuffer;
}
@Override @Override
public void bodyChanged() { public void bodyChanged() {
bufferValid = false; bufferValid = false;

View File

@ -63,6 +63,15 @@ public interface Packet {
*/ */
ActiveMQBuffer encode(RemotingConnection connection); ActiveMQBuffer encode(RemotingConnection connection);
/**
* Encodes the packet and returns a {@link ActiveMQBuffer} containing the data
*
* @param connection the connection
* @param usePooled if the returned buffer should be pooled or unpooled
* @return the buffer to encode to
*/
ActiveMQBuffer encode(RemotingConnection connection, boolean usePooled);
/** /**
* decodes the buffer into this packet * decodes the buffer into this packet
* *

View File

@ -919,6 +919,8 @@ public class ActiveMQSessionContext extends SessionContext {
ActiveMQBuffer buffer = packet.encode(this.getCoreConnection()); ActiveMQBuffer buffer = packet.encode(this.getCoreConnection());
conn.write(buffer, false, false); conn.write(buffer, false, false);
buffer.release();
} }
} }

View File

@ -305,6 +305,8 @@ public final class ChannelImpl implements Channel {
// buffer is full, preventing any incoming buffers being handled and blocking failover // buffer is full, preventing any incoming buffers being handled and blocking failover
connection.getTransportConnection().write(buffer, flush, batch); connection.getTransportConnection().write(buffer, flush, batch);
buffer.release();
return true; return true;
} }
} }
@ -412,6 +414,7 @@ public final class ChannelImpl implements Channel {
} }
} finally { } finally {
lock.unlock(); lock.unlock();
buffer.release();
} }
return response; return response;
@ -634,6 +637,9 @@ public final class ChannelImpl implements Channel {
final ActiveMQBuffer buffer = packet.encode(connection); final ActiveMQBuffer buffer = packet.encode(connection);
connection.getTransportConnection().write(buffer, false, false); connection.getTransportConnection().write(buffer, false, false);
buffer.release();
} }
private void addResendPacket(Packet packet) { private void addResendPacket(Packet packet) {

View File

@ -304,7 +304,13 @@ public class PacketImpl implements Packet {
@Override @Override
public ActiveMQBuffer encode(final RemotingConnection connection) { public ActiveMQBuffer encode(final RemotingConnection connection) {
ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE); return encode(connection,true);
}
@Override
public ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) {
ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled);
// The standard header fields // The standard header fields

View File

@ -56,7 +56,7 @@ public class SessionReceiveMessage extends MessagePacket {
public ActiveMQBuffer encode(final RemotingConnection connection) { public ActiveMQBuffer encode(final RemotingConnection connection) {
ActiveMQBuffer buffer = message.getEncodedBuffer(); ActiveMQBuffer buffer = message.getEncodedBuffer();
ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex()); ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex(), true);
bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity()); bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());

View File

@ -68,7 +68,7 @@ public class SessionSendMessage extends MessagePacket {
// this is for unit tests only // this is for unit tests only
bufferWrite = buffer.copy(0, buffer.capacity()); bufferWrite = buffer.copy(0, buffer.capacity());
} else { } else {
bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1); // 1 for the requireResponse bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse
} }
bufferWrite.writeBytes(buffer, 0, buffer.writerIndex()); bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());

View File

@ -210,6 +210,11 @@ public class NettyConnection implements Connection {
@Override @Override
public ActiveMQBuffer createTransportBuffer(final int size) { public ActiveMQBuffer createTransportBuffer(final int size) {
return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true); return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
} }

View File

@ -178,6 +178,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return transportConnection.createTransportBuffer(size); return transportConnection.createTransportBuffer(size);
} }
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
return transportConnection.createTransportBuffer(size, pooled);
}
@Override @Override
public Connection getTransportConnection() { public Connection getTransportConnection() {
return transportConnection; return transportConnection;

View File

@ -120,6 +120,8 @@ public interface RemotingConnection extends BufferHandler {
*/ */
ActiveMQBuffer createTransportBuffer(int size); ActiveMQBuffer createTransportBuffer(int size);
ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
/** /**
* called when the underlying connection fails. * called when the underlying connection fails.
* *

View File

@ -35,6 +35,8 @@ public interface Connection {
*/ */
ActiveMQBuffer createTransportBuffer(int size); ActiveMQBuffer createTransportBuffer(int size);
ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
RemotingConnection getProtocolConnection(); RemotingConnection getProtocolConnection();
void setProtocolConnection(RemotingConnection connection); void setProtocolConnection(RemotingConnection connection);

View File

@ -765,5 +765,10 @@ public class TestConversions extends Assert {
public ByteBuffer toByteBuffer(int index, int length) { public ByteBuffer toByteBuffer(int index, int length) {
return null; return null;
} }
@Override
public void release() {
//no-op
}
} }
} }

View File

@ -132,6 +132,11 @@ public class MQTTConnection implements RemotingConnection {
@Override @Override
public ActiveMQBuffer createTransportBuffer(int size) { public ActiveMQBuffer createTransportBuffer(int size) {
return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
return transportConnection.createTransportBuffer(size); return transportConnection.createTransportBuffer(size);
} }

View File

@ -293,6 +293,11 @@ public final class StompConnection implements RemotingConnection {
@Override @Override
public ActiveMQBuffer createTransportBuffer(int size) { public ActiveMQBuffer createTransportBuffer(int size) {
return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
return ActiveMQBuffers.dynamicBuffer(size); return ActiveMQBuffers.dynamicBuffer(size);
} }

View File

@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler; import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
import org.apache.activemq.artemis.core.server.cluster.qourum.Vote; import org.apache.activemq.artemis.core.server.cluster.qourum.Vote;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class QuorumVoteMessage extends PacketImpl { public class QuorumVoteMessage extends PacketImpl {
@ -40,6 +41,11 @@ public class QuorumVoteMessage extends PacketImpl {
this.vote = vote; this.vote = vote;
} }
@Override
public ActiveMQBuffer encode(final RemotingConnection connection) {
return encode(connection,false);
}
@Override @Override
public void encodeRest(ActiveMQBuffer buffer) { public void encodeRest(ActiveMQBuffer buffer) {
super.encodeRest(buffer); super.encodeRest(buffer);

View File

@ -146,7 +146,16 @@ public class InVMConnection implements Connection {
@Override @Override
public ActiveMQBuffer createTransportBuffer(final int size) { public ActiveMQBuffer createTransportBuffer(final int size) {
return ActiveMQBuffers.dynamicBuffer(size); return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
if ( pooled ) {
return ActiveMQBuffers.pooledBuffer( size );
} else {
return ActiveMQBuffers.dynamicBuffer( size );
}
} }
@Override @Override
@ -173,9 +182,13 @@ public class InVMConnection implements Connection {
final boolean flush, final boolean flush,
final boolean batch, final boolean batch,
final ChannelFutureListener futureListener) { final ChannelFutureListener futureListener) {
final ActiveMQBuffer copied = buffer.copy(0, buffer.capacity());
copied.setIndex(buffer.readerIndex(), buffer.writerIndex()); final ActiveMQBuffer copied = ActiveMQBuffers.pooledBuffer(buffer.capacity());
int read = buffer.readerIndex();
int writ = buffer.writerIndex();
copied.writeBytes(buffer,read,writ - read);
copied.setIndex(read,writ);
buffer.setIndex(read,writ);
try { try {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@ -201,6 +214,10 @@ public class InVMConnection implements Connection {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(InVMConnection.this + "::packet sent done"); logger.trace(InVMConnection.this + "::packet sent done");
} }
copied.release();
// if ( copied.byteBuf().refCnt() > 0 ) {
// copied.release();
// }
} }
} }
}); });

View File

@ -286,13 +286,18 @@ public class MessageImplTest extends ActiveMQTestBase {
} }
for (int i = 0; i < RUNS; i++) { for (int i = 0; i < RUNS; i++) {
ActiveMQBuffer buf = null;
try { try {
SessionSendMessage ssm = new SessionSendMessage(msg); SessionSendMessage ssm = new SessionSendMessage(msg);
ActiveMQBuffer buf = ssm.encode(null); buf = ssm.encode(null);
simulateRead(buf); simulateRead(buf);
} catch (Throwable e) { } catch (Throwable e) {
e.printStackTrace(); e.printStackTrace();
errors.incrementAndGet(); errors.incrementAndGet();
} finally {
if ( buf != null ) {
buf.release();
}
} }
} }
} }