ARTEMIS-1077 Fixing Pool usage on InVM and RemotingConnection

This commit is contained in:
Clebert Suconic 2017-03-27 11:54:44 -04:00 committed by Justin Bertram
parent 3efeccfa04
commit b395504e15
17 changed files with 20 additions and 94 deletions

View File

@ -63,15 +63,6 @@ public interface Packet {
*/ */
ActiveMQBuffer encode(RemotingConnection connection); ActiveMQBuffer encode(RemotingConnection connection);
/**
* Encodes the packet and returns a {@link ActiveMQBuffer} containing the data
*
* @param connection the connection
* @param usePooled if the returned buffer should be pooled or unpooled
* @return the buffer to encode to
*/
ActiveMQBuffer encode(RemotingConnection connection, boolean usePooled);
/** /**
* decodes the buffer into this packet * decodes the buffer into this packet
* *

View File

@ -920,8 +920,6 @@ public class ActiveMQSessionContext extends SessionContext {
ActiveMQBuffer buffer = packet.encode(this.getCoreConnection()); ActiveMQBuffer buffer = packet.encode(this.getCoreConnection());
conn.write(buffer, false, false); conn.write(buffer, false, false);
buffer.release();
} }
} }

View File

@ -304,9 +304,6 @@ public final class ChannelImpl implements Channel {
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover // buffer is full, preventing any incoming buffers being handled and blocking failover
connection.getTransportConnection().write(buffer, flush, batch); connection.getTransportConnection().write(buffer, flush, batch);
buffer.release();
return true; return true;
} }
} }
@ -415,7 +412,6 @@ public final class ChannelImpl implements Channel {
} }
} finally { } finally {
lock.unlock(); lock.unlock();
buffer.release();
} }
return response; return response;
@ -639,8 +635,6 @@ public final class ChannelImpl implements Channel {
connection.getTransportConnection().write(buffer, false, false); connection.getTransportConnection().write(buffer, false, false);
buffer.release();
} }
private void addResendPacket(Packet packet) { private void addResendPacket(Packet packet) {

View File

@ -308,13 +308,7 @@ public class PacketImpl implements Packet {
@Override @Override
public ActiveMQBuffer encode(final RemotingConnection connection) { public ActiveMQBuffer encode(final RemotingConnection connection) {
return encode(connection,true); ActiveMQBuffer buffer = createPacket(connection);
}
@Override
public ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) {
ActiveMQBuffer buffer = createPacket(connection, usePooled);
// The standard header fields // The standard header fields
@ -334,11 +328,11 @@ public class PacketImpl implements Packet {
return buffer; return buffer;
} }
protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { protected ActiveMQBuffer createPacket(RemotingConnection connection) {
if (connection == null) { if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(INITIAL_PACKET_SIZE)); return new ChannelBufferWrapper(Unpooled.buffer(INITIAL_PACKET_SIZE));
} else { } else {
return connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled); return connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
} }
} }

View File

@ -43,11 +43,11 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI
return super.getParentString() + ", message=" + message; return super.getParentString() + ", message=" + message;
} }
protected ActiveMQBuffer internalCreatePacket(int size, RemotingConnection connection, boolean usePooled) { protected ActiveMQBuffer internalCreatePacket(int size, RemotingConnection connection) {
if (connection == null) { if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(size)); return new ChannelBufferWrapper(Unpooled.buffer(size));
} else { } else {
return connection.createTransportBuffer(size, usePooled); return connection.createTransportBuffer(size);
} }
} }

View File

@ -79,12 +79,12 @@ public abstract class SessionContinuationMessage extends PacketImpl {
} }
@Override @Override
protected final ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { protected final ActiveMQBuffer createPacket(RemotingConnection connection) {
final int expectedEncodedSize = expectedEncodedSize(); final int expectedEncodedSize = expectedEncodedSize();
if (connection == null) { if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize)); return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize));
} else { } else {
return connection.createTransportBuffer(expectedEncodedSize, usePooled); return connection.createTransportBuffer(expectedEncodedSize);
} }
} }
@ -137,4 +137,4 @@ public abstract class SessionContinuationMessage extends PacketImpl {
return true; return true;
} }
} }

View File

@ -54,8 +54,8 @@ public class SessionReceiveMessage extends MessagePacket {
} }
@Override @Override
protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { protected ActiveMQBuffer createPacket(RemotingConnection connection) {
return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection, usePooled); return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection);
} }
@Override @Override

View File

@ -62,16 +62,14 @@ public class SessionSendMessage extends MessagePacket {
} }
@Override @Override
protected ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { protected ActiveMQBuffer createPacket(RemotingConnection connection) {
return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection, usePooled); return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection);
} }
@Override @Override
public void encodeRest(ActiveMQBuffer buffer) { public void encodeRest(ActiveMQBuffer buffer) {
message.sendBuffer(buffer.byteBuf(), 0); message.sendBuffer(buffer.byteBuf(), 0);
buffer.writeBoolean(requiresResponse); buffer.writeBoolean(requiresResponse);
} }
@Override @Override

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -210,12 +211,7 @@ public class NettyConnection implements Connection {
@Override @Override
public ActiveMQBuffer createTransportBuffer(final int size) { public ActiveMQBuffer createTransportBuffer(final int size) {
return createTransportBuffer(size, false); return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
}
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
} }
@Override @Override

View File

@ -393,7 +393,7 @@ public class NettyConnector extends AbstractConnector {
} }
channelClazz = EpollSocketChannel.class; channelClazz = EpollSocketChannel.class;
logger.info("Connector " + this + " using native epoll"); logger.debug("Connector " + this + " using native epoll");
} else { } else {
if (useGlobalWorkerPool) { if (useGlobalWorkerPool) {
channelClazz = NioSocketChannel.class; channelClazz = NioSocketChannel.class;
@ -404,7 +404,7 @@ public class NettyConnector extends AbstractConnector {
} }
channelClazz = NioSocketChannel.class; channelClazz = NioSocketChannel.class;
logger.info("Connector + " + this + " using nio"); logger.debug("Connector + " + this + " using nio");
} }
// if we are a servlet wrap the socketChannelFactory // if we are a servlet wrap the socketChannelFactory

View File

@ -178,11 +178,6 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return transportConnection.createTransportBuffer(size); return transportConnection.createTransportBuffer(size);
} }
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
return transportConnection.createTransportBuffer(size, pooled);
}
@Override @Override
public Connection getTransportConnection() { public Connection getTransportConnection() {
return transportConnection; return transportConnection;

View File

@ -120,8 +120,6 @@ public interface RemotingConnection extends BufferHandler {
*/ */
ActiveMQBuffer createTransportBuffer(int size); ActiveMQBuffer createTransportBuffer(int size);
ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
/** /**
* called when the underlying connection fails. * called when the underlying connection fails.
* *

View File

@ -35,8 +35,6 @@ public interface Connection {
*/ */
ActiveMQBuffer createTransportBuffer(int size); ActiveMQBuffer createTransportBuffer(int size);
ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
RemotingConnection getProtocolConnection(); RemotingConnection getProtocolConnection();
void setProtocolConnection(RemotingConnection connection); void setProtocolConnection(RemotingConnection connection);

View File

@ -132,11 +132,6 @@ public class MQTTConnection implements RemotingConnection {
@Override @Override
public ActiveMQBuffer createTransportBuffer(int size) { public ActiveMQBuffer createTransportBuffer(int size) {
return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
return transportConnection.createTransportBuffer(size); return transportConnection.createTransportBuffer(size);
} }

View File

@ -297,11 +297,6 @@ public final class StompConnection implements RemotingConnection {
@Override @Override
public ActiveMQBuffer createTransportBuffer(int size) { public ActiveMQBuffer createTransportBuffer(int size) {
return createTransportBuffer(size, false);
}
@Override
public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
return ActiveMQBuffers.dynamicBuffer(size); return ActiveMQBuffers.dynamicBuffer(size);
} }

View File

@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler; import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
import org.apache.activemq.artemis.core.server.cluster.qourum.Vote; import org.apache.activemq.artemis.core.server.cluster.qourum.Vote;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class QuorumVoteMessage extends PacketImpl { public class QuorumVoteMessage extends PacketImpl {
@ -42,11 +41,6 @@ public class QuorumVoteMessage extends PacketImpl {
this.vote = vote; this.vote = vote;
} }
@Override
public ActiveMQBuffer encode(final RemotingConnection connection) {
return encode(connection,false);
}
@Override @Override
public void encodeRest(ActiveMQBuffer buffer) { public void encodeRest(ActiveMQBuffer buffer) {
super.encodeRest(buffer); super.encodeRest(buffer);

View File

@ -146,16 +146,7 @@ public class InVMConnection implements Connection {
@Override @Override
public ActiveMQBuffer createTransportBuffer(final int size) { public ActiveMQBuffer createTransportBuffer(final int size) {
return createTransportBuffer(size, false); return ActiveMQBuffers.pooledBuffer(size);
}
@Override
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
if ( pooled ) {
return ActiveMQBuffers.pooledBuffer( size );
} else {
return ActiveMQBuffers.dynamicBuffer( size );
}
} }
@Override @Override
@ -183,26 +174,18 @@ public class InVMConnection implements Connection {
final boolean batch, final boolean batch,
final ChannelFutureListener futureListener) { final ChannelFutureListener futureListener) {
final ActiveMQBuffer copied = ActiveMQBuffers.pooledBuffer(buffer.capacity());
int read = buffer.readerIndex();
int writ = buffer.writerIndex();
copied.writeBytes(buffer,read,writ - read);
copied.setIndex(read,writ);
buffer.setIndex(read,writ);
try { try {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
if (!closed) { if (!closed) {
copied.readInt(); // read and discard buffer.readInt(); // read and discard
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(InVMConnection.this + "::Sending inVM packet"); logger.trace(InVMConnection.this + "::Sending inVM packet");
} }
handler.bufferReceived(id, copied); handler.bufferReceived(id, buffer);
if (futureListener != null) { if (futureListener != null) {
// TODO BEFORE MERGE: (is null a good option here?)
futureListener.operationComplete(null); futureListener.operationComplete(null);
} }
} }
@ -211,13 +194,10 @@ public class InVMConnection implements Connection {
ActiveMQServerLogger.LOGGER.errorWritingToInvmConnector(e, this); ActiveMQServerLogger.LOGGER.errorWritingToInvmConnector(e, this);
throw new IllegalStateException(msg, e); throw new IllegalStateException(msg, e);
} finally { } finally {
buffer.release();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(InVMConnection.this + "::packet sent done"); logger.trace(InVMConnection.this + "::packet sent done");
} }
copied.release();
// if ( copied.byteBuf().refCnt() > 0 ) {
// copied.release();
// }
} }
} }
}); });