diff --git a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java index a3fa5b512d..53d7306173 100644 --- a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java +++ b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java @@ -35,6 +35,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer protected ByteBuf buffer; // NO_UCD (use final) private final boolean releasable; + public static ByteBuf unwrap(ByteBuf buffer) + { + ByteBuf parent; + while ((parent = buffer.unwrap()) != null && + parent != buffer) // this last part is just in case the semantic + { // ever changes where unwrap is returning itself + buffer = parent; + } + + return buffer; + } + public ChannelBufferWrapper(final ByteBuf buffer) { this(buffer, false); diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java index 96b3e2b6a4..1cd342d806 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java @@ -46,7 +46,9 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) { - super(buffer.byteBuf()); + // a wrapped inside a wrapper will increase the stack size. + // we fixed this here due to some profiling testing + super(unwrap(buffer.byteBuf())); this.limit = limit; diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java index 4027c67c77..6b23bffca3 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java @@ -85,6 +85,4 @@ public interface Packet * @return true if confirmation is required */ boolean isRequiresConfirmations(); - - boolean isAsyncExec(); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java index b7366df301..890e8d097b 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -482,7 +482,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling String handshake = "HORNETQ"; - ActiveMQBuffer amqbuffer = connection.createBuffer(handshake.length()); + ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length()); amqbuffer.writeBytes(handshake.getBytes()); transportConnection.write(amqbuffer); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java index e2bdc28a56..61e0eca07f 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java @@ -276,7 +276,7 @@ public class PacketImpl implements Packet public ActiveMQBuffer encode(final RemotingConnection connection) { - ActiveMQBuffer buffer = connection.createBuffer(PacketImpl.INITIAL_PACKET_SIZE); + ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE); // The standard header fields @@ -333,11 +333,6 @@ public class PacketImpl implements Packet return true; } - public boolean isAsyncExec() - { - return false; - } - @Override public String toString() { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java index 5ef4cfd9ea..88208501f0 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java @@ -81,8 +81,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement private final Object failLock = new Object(); - private volatile boolean executing; - private final SimpleString nodeID; private String clientID; @@ -381,39 +379,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement ActiveMQClientLogger.LOGGER.trace("handling packet " + packet); } - if (packet.isAsyncExec() && executor != null) - { - executing = true; - - executor.execute(new Runnable() - { - public void run() - { - try - { - doBufferReceived(packet); - } - catch (Throwable t) - { - ActiveMQClientLogger.LOGGER.errorHandlingPacket(t, packet); - } - - executing = false; - } - }); - } - else - { - //To prevent out of order execution if interleaving sync and async operations on same connection - while (executing) - { - Thread.yield(); - } - - // Pings must always be handled out of band so we can send pings back to the client quickly - // otherwise they would get in the queue with everything else which might give an intolerable delay - doBufferReceived(packet); - } + dataReceived = true; + doBufferReceived(packet); super.bufferReceived(connectionID, buffer); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java index 41c5735207..340c73adfd 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java @@ -69,12 +69,6 @@ public class RollbackMessage extends PacketImpl considerLastMessageAsDelivered = buffer.readBoolean(); } - @Override - public boolean isAsyncExec() - { - return true; - } - @Override public int hashCode() { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java index 1c8a276952..dc61860147 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java @@ -49,10 +49,4 @@ public class SessionCloseMessage extends PacketImpl // TODO return 0; } - - @Override - public boolean isAsyncExec() - { - return true; - } } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java index c7242fb1f4..1b1e081531 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java @@ -30,9 +30,4 @@ public class SessionCommitMessage extends PacketImpl super(SESS_COMMIT); } - @Override - public boolean isAsyncExec() - { - return true; - } } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java index 65fdf33868..14668cb1fb 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java @@ -55,12 +55,6 @@ public class SessionXACommitMessage extends PacketImpl return onePhase; } - @Override - public boolean isAsyncExec() - { - return true; - } - @Override public void encodeRest(final ActiveMQBuffer buffer) { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java index b9a531a4c7..8ff5b1531c 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java @@ -61,12 +61,6 @@ public class SessionXAPrepareMessage extends PacketImpl xid = XidCodecSupport.decodeXid(buffer); } - @Override - public boolean isAsyncExec() - { - return true; - } - @Override public int hashCode() { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java index 8efab01774..272386dc17 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java @@ -62,12 +62,6 @@ public class SessionXARollbackMessage extends PacketImpl xid = XidCodecSupport.decodeXid(buffer); } - @Override - public boolean isAsyncExec() - { - return true; - } - @Override public int hashCode() { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java index a73aa1bfa7..c7eafe76c1 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java @@ -172,9 +172,9 @@ public class NettyConnection implements Connection listener.connectionDestroyed(getID()); } - public ActiveMQBuffer createBuffer(final int size) + public ActiveMQBuffer createTransportBuffer(final int size) { - return new ChannelBufferWrapper(channel.alloc().buffer(size)); + return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true); } public Object getID() @@ -199,7 +199,7 @@ public class NettyConnection implements Connection { channel.writeAndFlush(batchBuffer.byteBuf()); - batchBuffer = createBuffer(BATCHING_BUFFER_SIZE); + batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE); } } finally diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java index 3ed9e879cf..8425edd53c 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java @@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; @@ -477,7 +476,7 @@ public class NettyConnector extends AbstractConnector } bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true); - bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false)); + bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE); final SSLContext context; diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java index a48845fa9b..22b26eeb14 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java @@ -182,9 +182,9 @@ public abstract class AbstractRemotingConnection implements RemotingConnection closeListeners.addAll(listeners); } - public ActiveMQBuffer createBuffer(final int size) + public ActiveMQBuffer createTransportBuffer(final int size) { - return transportConnection.createBuffer(size); + return transportConnection.createTransportBuffer(size); } public Connection getTransportConnection() diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java index 186e098ddf..9eb287ef27 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java @@ -115,11 +115,12 @@ public interface RemotingConnection extends BufferHandler /** * creates a new ActiveMQBuffer of the specified size. + * For the purpose of i/o outgoing packets * * @param size the size of buffer required * @return the buffer */ - ActiveMQBuffer createBuffer(int size); + ActiveMQBuffer createTransportBuffer(int size); /** * called when the underlying connection fails. diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java index a4896e29ce..b6060d3a12 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java @@ -36,7 +36,7 @@ public interface Connection * @param size the size of buffer to create * @return the new buffer. */ - ActiveMQBuffer createBuffer(int size); + ActiveMQBuffer createTransportBuffer(int size); RemotingConnection getProtocolConnection(); diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java index 20762e2cc9..bf906bca73 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java @@ -440,7 +440,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } @Override - public ActiveMQBuffer createBuffer(int size) + public ActiveMQBuffer createTransportBuffer(int size) { return ActiveMQBuffers.dynamicBuffer(size); } diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java index 16cf55e6c5..9a4e7b726d 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java @@ -280,7 +280,7 @@ public final class StompConnection implements RemotingConnection } @Override - public ActiveMQBuffer createBuffer(int size) + public ActiveMQBuffer createTransportBuffer(int size) { return ActiveMQBuffers.dynamicBuffer(size); } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java index eed1ff4a35..37e2acba3e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java @@ -142,7 +142,7 @@ public class InVMConnection implements Connection } } - public ActiveMQBuffer createBuffer(final int size) + public ActiveMQBuffer createTransportBuffer(final int size) { return ActiveMQBuffers.dynamicBuffer(size); } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java index d899e8583f..339b407130 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java @@ -34,7 +34,7 @@ public class NettyServerConnection extends NettyConnection } @Override - public ActiveMQBuffer createBuffer(int size) + public ActiveMQBuffer createTransportBuffer(int size) { return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true); } diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java index 784a0c063e..cfbf9956c9 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java @@ -1391,7 +1391,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase } /* (non-Javadoc) - * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(byte[]) + * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(byte[]) */ public ActiveMQBuffer createBuffer(final byte[] bytes) { @@ -1400,7 +1400,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase } /* (non-Javadoc) - * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(int) + * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(int) */ public ActiveMQBuffer createBuffer(final int size) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index cc9d57b70d..b0a4c14802 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -74,7 +74,7 @@ public class NettyConnectionTest extends UnitTestCase final int size = 1234; - ActiveMQBuffer buff = conn.createBuffer(size); + ActiveMQBuffer buff = conn.createTransportBuffer(size); buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization. Assert.assertEquals(size, buff.capacity());