ACTIVEMQ6-78 Improving performance over Netty NIO blocked calls

https://issues.apache.org/jira/browse/ACTIVEMQ6-78 performance work

There are two aspects of this work. First avoid asynchronous packets and avoid
context switch over the executors. Packet had a method to make certain packets such
as commit to use a different executor. Since it's NIO everything is done at the Netty thread now.

The second aspect was to make sure we use the proper buffering
This commit is contained in:
Clebert Suconic 2015-02-10 11:30:18 -05:00
parent 41b28f4b23
commit f7c4d56cc7
23 changed files with 35 additions and 96 deletions

View File

@ -35,6 +35,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer
protected ByteBuf buffer; // NO_UCD (use final) protected ByteBuf buffer; // NO_UCD (use final)
private final boolean releasable; private final boolean releasable;
public static ByteBuf unwrap(ByteBuf buffer)
{
ByteBuf parent;
while ((parent = buffer.unwrap()) != null &&
parent != buffer) // this last part is just in case the semantic
{ // ever changes where unwrap is returning itself
buffer = parent;
}
return buffer;
}
public ChannelBufferWrapper(final ByteBuf buffer) public ChannelBufferWrapper(final ByteBuf buffer)
{ {
this(buffer, false); this(buffer, false);

View File

@ -46,7 +46,9 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message)
{ {
super(buffer.byteBuf()); // a wrapped inside a wrapper will increase the stack size.
// we fixed this here due to some profiling testing
super(unwrap(buffer.byteBuf()));
this.limit = limit; this.limit = limit;

View File

@ -85,6 +85,4 @@ public interface Packet
* @return true if confirmation is required * @return true if confirmation is required
*/ */
boolean isRequiresConfirmations(); boolean isRequiresConfirmations();
boolean isAsyncExec();
} }

View File

@ -482,7 +482,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
{ {
// no need to send handshake on inVM as inVM is not using the NettyProtocolHandling // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
String handshake = "HORNETQ"; String handshake = "HORNETQ";
ActiveMQBuffer amqbuffer = connection.createBuffer(handshake.length()); ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());
amqbuffer.writeBytes(handshake.getBytes()); amqbuffer.writeBytes(handshake.getBytes());
transportConnection.write(amqbuffer); transportConnection.write(amqbuffer);
} }

View File

@ -276,7 +276,7 @@ public class PacketImpl implements Packet
public ActiveMQBuffer encode(final RemotingConnection connection) public ActiveMQBuffer encode(final RemotingConnection connection)
{ {
ActiveMQBuffer buffer = connection.createBuffer(PacketImpl.INITIAL_PACKET_SIZE); ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
// The standard header fields // The standard header fields
@ -333,11 +333,6 @@ public class PacketImpl implements Packet
return true; return true;
} }
public boolean isAsyncExec()
{
return false;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -81,8 +81,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private final Object failLock = new Object(); private final Object failLock = new Object();
private volatile boolean executing;
private final SimpleString nodeID; private final SimpleString nodeID;
private String clientID; private String clientID;
@ -381,39 +379,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
ActiveMQClientLogger.LOGGER.trace("handling packet " + packet); ActiveMQClientLogger.LOGGER.trace("handling packet " + packet);
} }
if (packet.isAsyncExec() && executor != null) dataReceived = true;
{
executing = true;
executor.execute(new Runnable()
{
public void run()
{
try
{
doBufferReceived(packet); doBufferReceived(packet);
}
catch (Throwable t)
{
ActiveMQClientLogger.LOGGER.errorHandlingPacket(t, packet);
}
executing = false;
}
});
}
else
{
//To prevent out of order execution if interleaving sync and async operations on same connection
while (executing)
{
Thread.yield();
}
// Pings must always be handled out of band so we can send pings back to the client quickly
// otherwise they would get in the queue with everything else which might give an intolerable delay
doBufferReceived(packet);
}
super.bufferReceived(connectionID, buffer); super.bufferReceived(connectionID, buffer);
} }

View File

@ -69,12 +69,6 @@ public class RollbackMessage extends PacketImpl
considerLastMessageAsDelivered = buffer.readBoolean(); considerLastMessageAsDelivered = buffer.readBoolean();
} }
@Override
public boolean isAsyncExec()
{
return true;
}
@Override @Override
public int hashCode() public int hashCode()
{ {

View File

@ -49,10 +49,4 @@ public class SessionCloseMessage extends PacketImpl
// TODO // TODO
return 0; return 0;
} }
@Override
public boolean isAsyncExec()
{
return true;
}
} }

View File

@ -30,9 +30,4 @@ public class SessionCommitMessage extends PacketImpl
super(SESS_COMMIT); super(SESS_COMMIT);
} }
@Override
public boolean isAsyncExec()
{
return true;
}
} }

View File

@ -55,12 +55,6 @@ public class SessionXACommitMessage extends PacketImpl
return onePhase; return onePhase;
} }
@Override
public boolean isAsyncExec()
{
return true;
}
@Override @Override
public void encodeRest(final ActiveMQBuffer buffer) public void encodeRest(final ActiveMQBuffer buffer)
{ {

View File

@ -61,12 +61,6 @@ public class SessionXAPrepareMessage extends PacketImpl
xid = XidCodecSupport.decodeXid(buffer); xid = XidCodecSupport.decodeXid(buffer);
} }
@Override
public boolean isAsyncExec()
{
return true;
}
@Override @Override
public int hashCode() public int hashCode()
{ {

View File

@ -62,12 +62,6 @@ public class SessionXARollbackMessage extends PacketImpl
xid = XidCodecSupport.decodeXid(buffer); xid = XidCodecSupport.decodeXid(buffer);
} }
@Override
public boolean isAsyncExec()
{
return true;
}
@Override @Override
public int hashCode() public int hashCode()
{ {

View File

@ -172,9 +172,9 @@ public class NettyConnection implements Connection
listener.connectionDestroyed(getID()); listener.connectionDestroyed(getID());
} }
public ActiveMQBuffer createBuffer(final int size) public ActiveMQBuffer createTransportBuffer(final int size)
{ {
return new ChannelBufferWrapper(channel.alloc().buffer(size)); return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
} }
public Object getID() public Object getID()
@ -199,7 +199,7 @@ public class NettyConnection implements Connection
{ {
channel.writeAndFlush(batchBuffer.byteBuf()); channel.writeAndFlush(batchBuffer.byteBuf());
batchBuffer = createBuffer(BATCHING_BUFFER_SIZE); batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
} }
} }
finally finally

View File

@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -477,7 +476,7 @@ public class NettyConnector extends AbstractConnector
} }
bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false)); bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE); channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);
final SSLContext context; final SSLContext context;

View File

@ -182,9 +182,9 @@ public abstract class AbstractRemotingConnection implements RemotingConnection
closeListeners.addAll(listeners); closeListeners.addAll(listeners);
} }
public ActiveMQBuffer createBuffer(final int size) public ActiveMQBuffer createTransportBuffer(final int size)
{ {
return transportConnection.createBuffer(size); return transportConnection.createTransportBuffer(size);
} }
public Connection getTransportConnection() public Connection getTransportConnection()

View File

@ -115,11 +115,12 @@ public interface RemotingConnection extends BufferHandler
/** /**
* creates a new ActiveMQBuffer of the specified size. * creates a new ActiveMQBuffer of the specified size.
* For the purpose of i/o outgoing packets
* *
* @param size the size of buffer required * @param size the size of buffer required
* @return the buffer * @return the buffer
*/ */
ActiveMQBuffer createBuffer(int size); ActiveMQBuffer createTransportBuffer(int size);
/** /**
* called when the underlying connection fails. * called when the underlying connection fails.

View File

@ -36,7 +36,7 @@ public interface Connection
* @param size the size of buffer to create * @param size the size of buffer to create
* @return the new buffer. * @return the new buffer.
*/ */
ActiveMQBuffer createBuffer(int size); ActiveMQBuffer createTransportBuffer(int size);
RemotingConnection getProtocolConnection(); RemotingConnection getProtocolConnection();

View File

@ -440,7 +440,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
} }
@Override @Override
public ActiveMQBuffer createBuffer(int size) public ActiveMQBuffer createTransportBuffer(int size)
{ {
return ActiveMQBuffers.dynamicBuffer(size); return ActiveMQBuffers.dynamicBuffer(size);
} }

View File

@ -280,7 +280,7 @@ public final class StompConnection implements RemotingConnection
} }
@Override @Override
public ActiveMQBuffer createBuffer(int size) public ActiveMQBuffer createTransportBuffer(int size)
{ {
return ActiveMQBuffers.dynamicBuffer(size); return ActiveMQBuffers.dynamicBuffer(size);
} }

View File

@ -142,7 +142,7 @@ public class InVMConnection implements Connection
} }
} }
public ActiveMQBuffer createBuffer(final int size) public ActiveMQBuffer createTransportBuffer(final int size)
{ {
return ActiveMQBuffers.dynamicBuffer(size); return ActiveMQBuffers.dynamicBuffer(size);
} }

View File

@ -34,7 +34,7 @@ public class NettyServerConnection extends NettyConnection
} }
@Override @Override
public ActiveMQBuffer createBuffer(int size) public ActiveMQBuffer createTransportBuffer(int size)
{ {
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true); return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
} }

View File

@ -1391,7 +1391,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
} }
/* (non-Javadoc) /* (non-Javadoc)
* @see org.apache.activemq.api.core.client.ClientSession#createBuffer(byte[]) * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(byte[])
*/ */
public ActiveMQBuffer createBuffer(final byte[] bytes) public ActiveMQBuffer createBuffer(final byte[] bytes)
{ {
@ -1400,7 +1400,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
} }
/* (non-Javadoc) /* (non-Javadoc)
* @see org.apache.activemq.api.core.client.ClientSession#createBuffer(int) * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(int)
*/ */
public ActiveMQBuffer createBuffer(final int size) public ActiveMQBuffer createBuffer(final int size)
{ {

View File

@ -74,7 +74,7 @@ public class NettyConnectionTest extends UnitTestCase
final int size = 1234; final int size = 1234;
ActiveMQBuffer buff = conn.createBuffer(size); ActiveMQBuffer buff = conn.createTransportBuffer(size);
buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization. buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
Assert.assertEquals(size, buff.capacity()); Assert.assertEquals(size, buff.capacity());