diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java index 93be28182c..cc4407c88e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java @@ -65,7 +65,11 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler { public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { ByteBuf buffer = (ByteBuf) msg; - handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer)); + try { + handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer)); + } finally { + buffer.release(); + } } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 33dbf4b8b4..c3a71c56d7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.Semaphore; import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -215,7 +216,7 @@ public class NettyConnection implements Connection { @Override public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) { - return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true); + return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 3f226ae934..6211e8f0f6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -407,7 +407,6 @@ public class NettyConnector extends AbstractConnector { } bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE); final SSLContext context; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java deleted file mode 100644 index 5e67952ec7..0000000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.remoting.impl.netty; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; - -/** - * A {@link ByteBufAllocator} which is partial pooled. Which means only direct {@link ByteBuf}s are pooled. The rest - * is unpooled. - */ -public class PartialPooledByteBufAllocator implements ByteBufAllocator { - - private static final ByteBufAllocator POOLED = PooledByteBufAllocator.DEFAULT; - private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false); - - public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator(); - - private PartialPooledByteBufAllocator() { - } - - @Override - public ByteBuf buffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf buffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf buffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf ioBuffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf heapBuffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf directBuffer() { - return POOLED.directBuffer(); - } - - @Override - public ByteBuf directBuffer(int initialCapacity) { - return POOLED.directBuffer(initialCapacity); - } - - @Override - public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { - return POOLED.directBuffer(initialCapacity, maxCapacity); - } - - @Override - public CompositeByteBuf compositeBuffer() { - return UNPOOLED.compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeBuffer(int maxNumComponents) { - return UNPOOLED.compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeHeapBuffer() { - return UNPOOLED.compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { - return UNPOOLED.compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeDirectBuffer() { - return POOLED.compositeDirectBuffer(); - } - - @Override - public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { - return POOLED.compositeDirectBuffer(); - } - - @Override - public boolean isDirectBufferPooled() { - return true; - } - - @Override - public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { - return UNPOOLED.calculateNewCapacity(minNewCapacity, maxCapacity); - } -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index 340861be55..ca78f29155 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.HttpKeepAliveRunnabl import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -185,10 +184,6 @@ public class ProtocolHandler { protocolManagerToUse.handshake(connection, new ChannelBufferWrapper(in)); pipeline.remove(this); - // https://issues.apache.org/jira/browse/ARTEMIS-392 - // Application servers or other components may upgrade a regular socket to Netty - // We need to be able to work normally as with anything else on Artemis - ctx.channel().config().setAllocator(PartialPooledByteBufAllocator.INSTANCE); ctx.flush(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 9088e57499..df0810813c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -319,7 +319,6 @@ public class NettyAcceptor extends AbstractAcceptor { bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); - bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); channelGroup = new DefaultChannelGroup("activemq-accepted-channels", GlobalEventExecutor.INSTANCE); serverChannelGroup = new DefaultChannelGroup("activemq-acceptor-channels", GlobalEventExecutor.INSTANCE); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java index d4b9f5401c..29963a02d2 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java @@ -268,7 +268,6 @@ public class NettyTcpTransport implements NettyTransport { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); if (options.getSendBufferSize() != -1) { bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java index 8a34a4b38a..f75a52ebb9 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java @@ -285,7 +285,6 @@ public class NettyWSTransport implements NettyTransport { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); if (options.getSendBufferSize() != -1) { bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java deleted file mode 100644 index 12f5568f0e..0000000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; - -/** - * A {@link ByteBufAllocator} which is partial pooled. Which means only direct - * {@link ByteBuf}s are pooled. The rest is unpooled. - */ -public class PartialPooledByteBufAllocator implements ByteBufAllocator { - - private static final ByteBufAllocator POOLED = PooledByteBufAllocator.DEFAULT; - private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false); - - public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator(); - - private PartialPooledByteBufAllocator() { - } - - @Override - public ByteBuf buffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf buffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf buffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf ioBuffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf heapBuffer() { - return UNPOOLED.heapBuffer(); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity) { - return UNPOOLED.heapBuffer(initialCapacity); - } - - @Override - public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { - return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); - } - - @Override - public ByteBuf directBuffer() { - return POOLED.directBuffer(); - } - - @Override - public ByteBuf directBuffer(int initialCapacity) { - return POOLED.directBuffer(initialCapacity); - } - - @Override - public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { - return POOLED.directBuffer(initialCapacity, maxCapacity); - } - - @Override - public CompositeByteBuf compositeBuffer() { - return UNPOOLED.compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeBuffer(int maxNumComponents) { - return UNPOOLED.compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeHeapBuffer() { - return UNPOOLED.compositeHeapBuffer(); - } - - @Override - public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { - return UNPOOLED.compositeHeapBuffer(maxNumComponents); - } - - @Override - public CompositeByteBuf compositeDirectBuffer() { - return POOLED.compositeDirectBuffer(); - } - - @Override - public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { - return POOLED.compositeDirectBuffer(); - } - - @Override - public boolean isDirectBufferPooled() { - return true; - } -} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java index 0afd30cb26..0f08ecd8a8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java @@ -26,7 +26,6 @@ import java.util.Map; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; @@ -52,7 +51,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; -import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -216,7 +214,6 @@ public class NettyConnectorWithHTTPUpgradeTest extends ActiveMQTestBase { } else { context = null; } - b.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception {