ARTEMIS-1087 Make InVM buffer pooling configurable
(cherry picked from commit 8760b3ddfd
)
This commit is contained in:
parent
7e4cb4d181
commit
bae011bafb
|
@ -27,7 +27,6 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||||
*/
|
*/
|
||||||
public final class ActiveMQBuffers {
|
public final class ActiveMQBuffers {
|
||||||
|
|
||||||
|
|
||||||
private static final PooledByteBufAllocator ALLOCATOR = PooledByteBufAllocator.DEFAULT;
|
private static final PooledByteBufAllocator ALLOCATOR = PooledByteBufAllocator.DEFAULT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -44,7 +43,6 @@ public final class ActiveMQBuffers {
|
||||||
return new ChannelBufferWrapper(ALLOCATOR.heapBuffer(size),true, true);
|
return new ChannelBufferWrapper(ALLOCATOR.heapBuffer(size),true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a <em>self-expanding</em> ActiveMQBuffer filled with the given byte array
|
* Creates a <em>self-expanding</em> ActiveMQBuffer filled with the given byte array
|
||||||
*
|
*
|
||||||
|
|
|
@ -72,6 +72,8 @@ public final class InVMAcceptor extends AbstractAcceptor {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(InVMAcceptor.class);
|
private static final Logger logger = Logger.getLogger(InVMAcceptor.class);
|
||||||
|
|
||||||
|
private final boolean enableBufferPooling;
|
||||||
|
|
||||||
public InVMAcceptor(final String name,
|
public InVMAcceptor(final String name,
|
||||||
final ClusterConnection clusterConnection,
|
final ClusterConnection clusterConnection,
|
||||||
final Map<String, Object> configuration,
|
final Map<String, Object> configuration,
|
||||||
|
@ -96,6 +98,8 @@ public final class InVMAcceptor extends AbstractAcceptor {
|
||||||
executorFactory = new OrderedExecutorFactory(threadPool);
|
executorFactory = new OrderedExecutorFactory(threadPool);
|
||||||
|
|
||||||
connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED, TransportConstants.DEFAULT_CONNECTIONS_ALLOWED, configuration);
|
connectionsAllowed = ConfigurationHelper.getLongProperty(TransportConstants.CONNECTIONS_ALLOWED, TransportConstants.DEFAULT_CONNECTIONS_ALLOWED, configuration);
|
||||||
|
|
||||||
|
enableBufferPooling = ConfigurationHelper.getBooleanProperty(TransportConstants.BUFFER_POOLING, TransportConstants.DEFAULT_BUFFER_POOLING, configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -222,6 +226,7 @@ public final class InVMAcceptor extends AbstractAcceptor {
|
||||||
Listener connectionListener = new Listener(connector);
|
Listener connectionListener = new Listener(connector);
|
||||||
|
|
||||||
InVMConnection inVMConnection = new InVMConnection(id, connectionID, remoteHandler, connectionListener, clientExecutor, defaultActiveMQPrincipal);
|
InVMConnection inVMConnection = new InVMConnection(id, connectionID, remoteHandler, connectionListener, clientExecutor, defaultActiveMQPrincipal);
|
||||||
|
inVMConnection.setEnableBufferPooling(enableBufferPooling);
|
||||||
|
|
||||||
connectionListener.connectionCreated(this, inVMConnection, protocolMap.get(ActiveMQClient.DEFAULT_CORE_PROTOCOL));
|
connectionListener.connectionCreated(this, inVMConnection, protocolMap.get(ActiveMQClient.DEFAULT_CORE_PROTOCOL));
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,8 @@ public class InVMConnection implements Connection {
|
||||||
|
|
||||||
private RemotingConnection protocolConnection;
|
private RemotingConnection protocolConnection;
|
||||||
|
|
||||||
|
private boolean bufferPoolingEnabled = TransportConstants.DEFAULT_BUFFER_POOLING;
|
||||||
|
|
||||||
public InVMConnection(final int serverID,
|
public InVMConnection(final int serverID,
|
||||||
final BufferHandler handler,
|
final BufferHandler handler,
|
||||||
final BaseConnectionLifeCycleListener listener,
|
final BaseConnectionLifeCycleListener listener,
|
||||||
|
@ -97,6 +99,10 @@ public class InVMConnection implements Connection {
|
||||||
this.defaultActiveMQPrincipal = defaultActiveMQPrincipal;
|
this.defaultActiveMQPrincipal = defaultActiveMQPrincipal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setEnableBufferPooling(boolean enableBufferPooling) {
|
||||||
|
this.bufferPoolingEnabled = enableBufferPooling;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void forceClose() {
|
public void forceClose() {
|
||||||
// no op
|
// no op
|
||||||
|
@ -151,11 +157,10 @@ public class InVMConnection implements Connection {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
|
public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
|
||||||
if ( pooled ) {
|
if (bufferPoolingEnabled && pooled) {
|
||||||
return ActiveMQBuffers.pooledBuffer( size );
|
return ActiveMQBuffers.pooledBuffer( size );
|
||||||
} else {
|
|
||||||
return ActiveMQBuffers.dynamicBuffer( size );
|
|
||||||
}
|
}
|
||||||
|
return ActiveMQBuffers.dynamicBuffer( size );
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -95,6 +95,8 @@ public class InVMConnector extends AbstractConnector {
|
||||||
|
|
||||||
private final Executor closeExecutor;
|
private final Executor closeExecutor;
|
||||||
|
|
||||||
|
private final boolean bufferPoolingEnabled;
|
||||||
|
|
||||||
private static ExecutorService threadPoolExecutor;
|
private static ExecutorService threadPoolExecutor;
|
||||||
|
|
||||||
public static synchronized void resetThreadPool() {
|
public static synchronized void resetThreadPool() {
|
||||||
|
@ -126,6 +128,8 @@ public class InVMConnector extends AbstractConnector {
|
||||||
|
|
||||||
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
|
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
|
||||||
|
|
||||||
|
bufferPoolingEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.BUFFER_POOLING, TransportConstants.DEFAULT_BUFFER_POOLING, configuration);
|
||||||
|
|
||||||
this.handler = handler;
|
this.handler = handler;
|
||||||
|
|
||||||
this.closeExecutor = closeExecutor;
|
this.closeExecutor = closeExecutor;
|
||||||
|
@ -215,6 +219,8 @@ public class InVMConnector extends AbstractConnector {
|
||||||
final Executor serverExecutor) {
|
final Executor serverExecutor) {
|
||||||
// No acceptor on a client connection
|
// No acceptor on a client connection
|
||||||
InVMConnection inVMConnection = new InVMConnection(id, handler, listener, serverExecutor);
|
InVMConnection inVMConnection = new InVMConnection(id, handler, listener, serverExecutor);
|
||||||
|
inVMConnection.setEnableBufferPooling(bufferPoolingEnabled);
|
||||||
|
|
||||||
listener.connectionCreated(null, inVMConnection, protocolManager);
|
listener.connectionCreated(null, inVMConnection, protocolManager);
|
||||||
return inVMConnection;
|
return inVMConnection;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,10 @@ public final class TransportConstants {
|
||||||
|
|
||||||
public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
|
public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
|
||||||
|
|
||||||
|
public static final String BUFFER_POOLING = "bufferPooling";
|
||||||
|
|
||||||
|
public static final boolean DEFAULT_BUFFER_POOLING = true;
|
||||||
|
|
||||||
private TransportConstants() {
|
private TransportConstants() {
|
||||||
// Utility class
|
// Utility class
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue