From 582a689cdba77978e55dd1cc41e64f4e9bc1b167 Mon Sep 17 00:00:00 2001 From: a181321 Date: Tue, 28 Feb 2023 18:39:44 +0100 Subject: [PATCH] ARTEMIS-4186 Ability to set compressionLevel for compressLargeMessages --- .../api/config/ServerLocatorConfig.java | 2 + .../api/core/client/ActiveMQClient.java | 2 + .../api/core/client/ServerLocator.java | 16 +++++ .../core/client/impl/ClientProducerImpl.java | 1 + .../client/impl/ClientSessionFactoryImpl.java | 2 +- .../core/client/impl/ClientSessionImpl.java | 10 +++ .../client/impl/ClientSessionInternal.java | 2 + .../core/client/impl/ServerLocatorImpl.java | 11 +++ .../artemis/utils/DeflaterReader.java | 4 ++ .../jms/client/ActiveMQConnectionFactory.java | 8 +++ .../artemis/jms/server/JMSServerManager.java | 2 + .../ConnectionFactoryConfiguration.java | 4 ++ .../ConnectionFactoryConfigurationImpl.java | 22 +++++- .../jms/server/impl/JMSServerManagerImpl.java | 5 +- .../ActiveMQRAManagedConnectionFactory.java | 8 +++ .../artemis/ra/ActiveMQResourceAdapter.java | 27 +++++++ .../ra/ConnectionFactoryProperties.java | 11 +++ docs/user-manual/en/large-messages.md | 9 +++ .../client/LargeMessageCompressTest.java | 70 +++++++++++++++++++ .../integration/jms/client/PreACKJMSTest.java | 2 +- .../jms/client/ReSendMessageTest.java | 2 +- ...ClosedOnRemotingConnectionFailureTest.java | 2 +- .../jms/client/TextMessageTest.java | 2 +- .../jms/connection/InvalidConnectorTest.java | 2 +- .../jms/divert/DivertAndACKClientTest.java | 2 +- .../tools/container/LocalTestServer.java | 2 +- .../ra/ActiveMQResourceAdapterConfigTest.java | 6 ++ 27 files changed, 226 insertions(+), 10 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java index 201ec8dd01..7196c5a39b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java @@ -47,6 +47,7 @@ public class ServerLocatorConfig { public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS; public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE; public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; + public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL; public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; public boolean useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING; @@ -84,5 +85,6 @@ public class ServerLocatorConfig { failoverAttempts = locator.failoverAttempts; initialMessagePacketSize = locator.initialMessagePacketSize; useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing; + compressionLevel = locator.compressionLevel; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java index f4134efbe1..0b911a2665 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java @@ -69,6 +69,8 @@ public final class ActiveMQClient { public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false; + public static final int DEFAULT_COMPRESSION_LEVEL = -1; + public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024; public static final int DEFAULT_CONSUMER_MAX_RATE = -1; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java index 2602d3f767..b7e6083f45 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java @@ -809,6 +809,22 @@ public interface ServerLocator extends AutoCloseable { */ ServerLocator setCompressLargeMessage(boolean compressLargeMessages); + /** + * What compression level is in use + * + * @return + */ + int getCompressionLevel(); + + /** + * Sets what compressionLevel to use when compressing messages + * Value must be -1 (default), or 0-9 + * + * @param compressionLevel + * @return this ServerLocator + */ + ServerLocator setCompressionLevel(int compressionLevel); + // XXX No javadocs ServerLocator addClusterTopologyListener(ClusterTopologyListener listener); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index f3fd2e1f69..f3b01a7eca 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -441,6 +441,7 @@ public class ClientProducerImpl implements ClientProducerInternal { if (session.isCompressLargeMessages()) { msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true); deflaterReader = new DeflaterReader(inputStreamParameter, messageSize); + deflaterReader.setLevel(session.getCompressionLevel()); input = deflaterReader; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 6f2fc28053..86d0b9465d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -836,7 +836,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID); - ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor()); + ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor()); synchronized (sessions) { if (closed || !clientProtocolManager.isAlive()) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index e388673645..54a2b86cbb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -123,6 +123,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private final boolean compressLargeMessages; + private final int compressionLevel; + private volatile int initialMessagePacketSize; private final boolean cacheLargeMessageClient; @@ -184,6 +186,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final boolean cacheLargeMessageClient, final int minLargeMessageSize, final boolean compressLargeMessages, + final int compressionLevel, final int initialMessagePacketSize, final String groupID, final SessionContext sessionContext, @@ -237,6 +240,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi this.compressLargeMessages = compressLargeMessages; + this.compressionLevel = compressionLevel; + this.initialMessagePacketSize = initialMessagePacketSize; this.groupID = groupID; @@ -1186,6 +1191,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return compressLargeMessages; } + @Override + public int getCompressionLevel() { + return compressionLevel; + } + /** * @return the cacheLargeMessageClient */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index b2833a2344..0c13fbdba6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -41,6 +41,8 @@ public interface ClientSessionInternal extends ClientSession { boolean isCompressLargeMessages(); + int getCompressionLevel(); + void expire(ClientConsumer consumer, Message message) throws ActiveMQException; void addConsumer(ClientConsumerInternal consumer); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 3fab8940fd..af7b304c86 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -1292,6 +1292,17 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return this; } + @Override + public int getCompressionLevel() { + return config.compressionLevel; + } + + @Override + public ServerLocatorImpl setCompressionLevel(int compressionLevel) { + this.config.compressionLevel = compressionLevel; + return this; + } + private void checkWrite() { synchronized (stateGuard) { if (state != null && state != STATE.CLOSED) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java index 4443687bde..91f0a0b7f6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java @@ -117,4 +117,8 @@ public class DeflaterReader extends InputStream { return bytesRead.get(); } + public void setLevel(int level) { + deflater.setLevel(level); + } + } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java index 9c53e34311..c0e78c8123 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java @@ -854,6 +854,14 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio serverLocator.setCompressLargeMessage(avoidLargeMessages); } + public int getCompressionLevel() { + return serverLocator.getCompressionLevel(); + } + + public void setCompressionLevel(int compressionLevel) { + serverLocator.setCompressionLevel(compressionLevel); + } + @Override public void close() { ServerLocator locator0 = serverLocator; diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/JMSServerManager.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/JMSServerManager.java index c89c392d28..e1b22f0b8d 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/JMSServerManager.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/JMSServerManager.java @@ -246,6 +246,7 @@ public interface JMSServerManager extends ActiveMQComponent { boolean cacheLargeMessagesClient, int minLargeMessageSize, boolean compressLargeMessage, + int compressionLevel, int consumerWindowSize, int consumerMaxRate, int confirmationWindowSize, @@ -282,6 +283,7 @@ public interface JMSServerManager extends ActiveMQComponent { boolean cacheLargeMessagesClient, int minLargeMessageSize, boolean compressLargeMessages, + int compressionLevel, int consumerWindowSize, int consumerMaxRate, int confirmationWindowSize, diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java index c02631a722..463bda3dd8 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/ConnectionFactoryConfiguration.java @@ -82,6 +82,10 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport { ConnectionFactoryConfiguration setCompressLargeMessages(boolean avoidLargeMessages); + int getCompressionLevel(); + + ConnectionFactoryConfiguration setCompressionLevel(int compressionLevel); + int getConsumerWindowSize(); ConnectionFactoryConfiguration setConsumerWindowSize(int consumerWindowSize); diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java index e233fec440..a76a436015 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java @@ -67,6 +67,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf private boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; + private int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL; + private int consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE; private int consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE; @@ -281,6 +283,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return this; } + @Override + public int getCompressionLevel() { + return compressionLevel; + } + + @Override + public ConnectionFactoryConfiguration setCompressionLevel(final int compressionLevel) { + this.compressionLevel = compressionLevel; + return this; + } + @Override public int getConsumerWindowSize() { return consumerWindowSize; @@ -642,6 +655,9 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf enableSharedClientID = buffer.readableBytes() > 0 ? BufferHelper.readNullableBoolean(buffer) : ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID; useTopologyForLoadBalancing = buffer.readableBytes() > 0 ? BufferHelper.readNullableBoolean(buffer) : ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING; + + compressionLevel = buffer.readableBytes() > 0 ? BufferHelper.readNullableInteger(buffer) : ActiveMQClient.DEFAULT_COMPRESSION_LEVEL; + } @Override @@ -738,6 +754,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf BufferHelper.writeNullableBoolean(buffer, enableSharedClientID); BufferHelper.writeNullableBoolean(buffer, useTopologyForLoadBalancing); + + BufferHelper.writeNullableInteger(buffer, compressionLevel); } @Override @@ -858,7 +876,9 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf BufferHelper.sizeOfNullableBoolean(enableSharedClientID) + - BufferHelper.sizeOfNullableBoolean(useTopologyForLoadBalancing); + BufferHelper.sizeOfNullableBoolean(useTopologyForLoadBalancing) + + + BufferHelper.sizeOfNullableInteger(compressionLevel); return size; } diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index e256237f1a..76ff364c6d 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -872,6 +872,7 @@ public class JMSServerManagerImpl extends CleaningActivateCallback implements JM final boolean cacheLargeMessagesClient, final int minLargeMessageSize, final boolean compressLargeMessage, + final int compressionLevel, final int consumerWindowSize, final int consumerMaxRate, final int confirmationWindowSize, @@ -917,6 +918,7 @@ public class JMSServerManagerImpl extends CleaningActivateCallback implements JM final boolean cacheLargeMessagesClient, final int minLargeMessageSize, final boolean compressLargeMessages, + final int compressionLevel, final int consumerWindowSize, final int consumerMaxRate, final int confirmationWindowSize, @@ -943,7 +945,7 @@ public class JMSServerManagerImpl extends CleaningActivateCallback implements JM checkInitialised(); ActiveMQConnectionFactory cf = connectionFactories.get(name); if (cf == null) { - ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name).setHA(ha).setBindings(registryBindings).setDiscoveryGroupName(discoveryGroupName).setFactoryType(cfType).setClientID(clientID).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setCacheLargeMessagesClient(cacheLargeMessagesClient).setMinLargeMessageSize(minLargeMessageSize).setCompressLargeMessages(compressLargeMessages).setConsumerWindowSize(consumerWindowSize).setConsumerMaxRate(consumerMaxRate).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setProducerMaxRate(producerMaxRate).setBlockOnAcknowledge(blockOnAcknowledge).setBlockOnDurableSend(blockOnDurableSend).setBlockOnNonDurableSend(blockOnNonDurableSend).setAutoGroup(autoGroup).setPreAcknowledge(preAcknowledge).setLoadBalancingPolicyClassName(loadBalancingPolicyClassName).setTransactionBatchSize(transactionBatchSize).setDupsOKBatchSize(dupsOKBatchSize).setUseGlobalPools(useGlobalPools).setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize).setThreadPoolMaxSize(threadPoolMaxSize).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setReconnectAttempts(reconnectAttempts).setFailoverOnInitialConnection(failoverOnInitialConnection); + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name).setHA(ha).setBindings(registryBindings).setDiscoveryGroupName(discoveryGroupName).setFactoryType(cfType).setClientID(clientID).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setCacheLargeMessagesClient(cacheLargeMessagesClient).setMinLargeMessageSize(minLargeMessageSize).setCompressLargeMessages(compressLargeMessages).setCompressionLevel(compressionLevel).setConsumerWindowSize(consumerWindowSize).setConsumerMaxRate(consumerMaxRate).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setProducerMaxRate(producerMaxRate).setBlockOnAcknowledge(blockOnAcknowledge).setBlockOnDurableSend(blockOnDurableSend).setBlockOnNonDurableSend(blockOnNonDurableSend).setAutoGroup(autoGroup).setPreAcknowledge(preAcknowledge).setLoadBalancingPolicyClassName(loadBalancingPolicyClassName).setTransactionBatchSize(transactionBatchSize).setDupsOKBatchSize(dupsOKBatchSize).setUseGlobalPools(useGlobalPools).setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize).setThreadPoolMaxSize(threadPoolMaxSize).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setReconnectAttempts(reconnectAttempts).setFailoverOnInitialConnection(failoverOnInitialConnection); createConnectionFactory(true, configuration, registryBindings); } } @@ -1212,6 +1214,7 @@ public class JMSServerManagerImpl extends CleaningActivateCallback implements JM cf.setReconnectAttempts(cfConfig.getReconnectAttempts()); cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection()); cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages()); + cf.setCompressionLevel(cfConfig.getCompressionLevel()); cf.setGroupID(cfConfig.getGroupID()); cf.setProtocolManagerFactoryStr(cfConfig.getProtocolManagerFactoryStr()); cf.setDeserializationBlackList(cfConfig.getDeserializationBlackList()); diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java index c88e69ffb0..a6afe259b8 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java @@ -689,6 +689,14 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti mcfProperties.setCompressLargeMessage(compressLargeMessage); } + public Integer getCompressionLevel() { + return mcfProperties.getCompressionLevel(); + } + + public void setCompressionLevel(final Integer compressionLevel) { + mcfProperties.setCompressionLevel(compressionLevel); + } + public Integer getInitialConnectAttempts() { return mcfProperties.getInitialConnectAttempts(); } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index 9e6578a9d1..4a7982d384 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -640,6 +640,29 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { raProperties.setCompressLargeMessage(compressLargeMessage); } + /** + * Get compressionLevel + * + * @return The value + */ + public Integer getCompressionLevel() { + logger.trace("getCompressionLevel()"); + + return raProperties.getCompressionLevel(); + } + + /** + * Sets what compressionLevel to use when compressing messages + * Value must be -1 (default) or 0-9 + * + * @param compressionLevel The value + */ + public void setCompressionLevel(final Integer compressionLevel) { + logger.trace("setCompressionLevel({})", compressionLevel); + + raProperties.setCompressionLevel(compressionLevel); + } + /** * Get call timeout * @@ -1828,6 +1851,10 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { if (val2 != null) { cf.setInitialMessagePacketSize(val2); } + val2 = overrideProperties.getCompressionLevel() != null ? overrideProperties.getCompressionLevel() : raProperties.getCompressionLevel(); + if (val2 != null) { + cf.setCompressionLevel(val2); + } Long val3 = overrideProperties.getClientFailureCheckPeriod() != null ? overrideProperties.getClientFailureCheckPeriod() : raProperties.getClientFailureCheckPeriod(); if (val3 != null) { diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java index b370a34064..1967819c9c 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java @@ -76,6 +76,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions { private Boolean compressLargeMessage; + private Integer compressionLevel; + private Integer consumerWindowSize; private Integer producerWindowSize; @@ -178,6 +180,15 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions { this.compressLargeMessage = compressLargeMessage; } + public Integer getCompressionLevel() { + return compressionLevel; + } + + public void setCompressionLevel(Integer compressionLevel) { + hasBeenUpdated = true; + this.compressionLevel = compressionLevel; + } + public String getConnectionLoadBalancingPolicyClassName() { logger.trace("getConnectionLoadBalancingPolicyClassName()"); diff --git a/docs/user-manual/en/large-messages.md b/docs/user-manual/en/large-messages.md index 57068c4871..ca7476e6e1 100644 --- a/docs/user-manual/en/large-messages.md +++ b/docs/user-manual/en/large-messages.md @@ -68,11 +68,20 @@ is transferred to the server's side. Notice that there's no special treatment at the server's side, all the compressing and uncompressing is done at the client. +This behavior can be tuned further by setting an optional parameter: `compressionLevel`. +This will decide how much the message body should be compressed. `compressionLevel` +accepts an integer of `-1` or a value between `0-9`. The default value is `-1` which +corresponds to around level 6-7. + If the compressed size of a large message is below `minLargeMessageSize`, it is sent to server as regular messages. This means that the message won't be written into the server's large-message data directory, thus reducing the disk I/O. +**Note:** A higher `compressionLevel` means the message body will get further compressed, +but this is at the cost of speed and computational overhead. Make sure to tune this value +according to its specific use-case. + ## Streaming large messages from Core Protocol Apache ActiveMQ Artemis supports setting the body of messages using input and diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index edc83e3441..2d48f8639a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -31,6 +31,7 @@ import java.util.zip.Deflater; import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -38,6 +39,8 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -510,6 +513,73 @@ public class LargeMessageCompressTest extends LargeMessageTest { validateNoFilesOnLargeDir(); } + @Test + public void testLargeMessageCompressionLevel() throws Exception { + + SimpleString address1 = new SimpleString("address1"); + SimpleString address2 = new SimpleString("address2"); + SimpleString address3 = new SimpleString("address3"); + + ActiveMQServer server = createServer(true, true); + server.start(); + + ServerLocator locator1 = ActiveMQClient.createServerLocator("tcp://localhost:61616?compressionLevel=1"); + ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0?compressionLevel=5"); + ServerLocator locator3 = ActiveMQClient.createServerLocator("vm://0"); + locator1.setCompressLargeMessage(true); + locator2.setCompressLargeMessage(true); + locator3.setCompressLargeMessage(true); + locator3.setCompressionLevel(9); + + ClientSessionFactory sf1 = locator1.createSessionFactory(); + ClientSessionFactory sf2 = locator2.createSessionFactory(); + ClientSessionFactory sf3 = locator3.createSessionFactory(); + ClientSession session1 = sf1.createSession(false, true, true); + ClientSession session2 = sf2.createSession(false, true, true); + ClientSession session3 = sf3.createSession(false, true, true); + + ClientProducer producer1 = session1.createProducer(address1); + ClientProducer producer2 = session2.createProducer(address2); + ClientProducer producer3 = session3.createProducer(address3); + + session1.createQueue(new QueueConfiguration(address1)); + session2.createQueue(new QueueConfiguration(address2)); + session3.createQueue(new QueueConfiguration(address3)); + + String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla"; + for (int i = 0; i < 20; i++) { + inputString = inputString + inputString; + } + + ClientMessage message = session1.createMessage(true); + message.getBodyBuffer().writeString(inputString); + producer1.send(message); + producer2.send(message); + producer3.send(message); + + QueueControl queueControl1 = (QueueControl)server.getManagementService(). + getResource(ResourceNames.QUEUE + address1); + QueueControl queueControl2 = (QueueControl)server.getManagementService(). + getResource(ResourceNames.QUEUE + address2); + QueueControl queueControl3 = (QueueControl)server.getManagementService(). + getResource(ResourceNames.QUEUE + address3); + + assertTrue(1 == queueControl1.getMessageCount()); + assertTrue(1 == queueControl2.getMessageCount()); + assertTrue(1 == queueControl3.getMessageCount()); + assertTrue(message.getPersistentSize() > queueControl1.getPersistentSize()); + assertTrue(queueControl1.getPersistentSize() > queueControl2.getPersistentSize()); + assertTrue(queueControl2.getPersistentSize() > queueControl3.getPersistentSize()); + + sf1.close(); + sf2.close(); + sf3.close(); + locator1.close(); + locator2.close(); + locator3.close(); + + } + @Override @Test public void testSendServerMessage() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/PreACKJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/PreACKJMSTest.java index 608ac8ecab..fe0157b63e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/PreACKJMSTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/PreACKJMSTest.java @@ -149,7 +149,7 @@ public class PreACKJMSTest extends JMSTestBase { ArrayList connectors = registerConnectors(server, connectorConfigs); - jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, connectors, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, true, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings); + jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, connectors, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, true, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReSendMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReSendMessageTest.java index cbadb6ab09..91c29cb21e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReSendMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReSendMessageTest.java @@ -225,7 +225,7 @@ public class ReSendMessageTest extends JMSTestBase { int reconnectAttempts = -1; int callTimeout = 30000; - jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, true, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings); + jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, true, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java index d511f1951e..4ac88ab79a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java @@ -49,7 +49,7 @@ public class SessionClosedOnRemotingConnectionFailureTest extends JMSTestBase { List connectorConfigs = new ArrayList<>(); connectorConfigs.add(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); - jmsServer.createConnectionFactory("cffoo", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, 0, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/cffoo"); + jmsServer.createConnectionFactory("cffoo", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, 0, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/cffoo"); cf = (ConnectionFactory) namingContext.lookup("/cffoo"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TextMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TextMessageTest.java index eeed30af0e..ccedc29c21 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TextMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TextMessageTest.java @@ -176,6 +176,6 @@ public class TextMessageTest extends JMSTestBase { int reconnectAttempts = -1; int callTimeout = 30000; - jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, true, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings); + jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, true, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/InvalidConnectorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/InvalidConnectorTest.java index 7348e2a16b..97e9bbab51 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/InvalidConnectorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/InvalidConnectorTest.java @@ -45,7 +45,7 @@ public class InvalidConnectorTest extends JMSTestBase { int reconnectAttempts = -1; int callTimeout = 30000; - jmsServer.createConnectionFactory("invalid-cf", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/invalid-cf"); + jmsServer.createConnectionFactory("invalid-cf", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/invalid-cf"); ActiveMQConnectionFactory invalidCf = (ActiveMQConnectionFactory) namingContext.lookup("/invalid-cf"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/divert/DivertAndACKClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/divert/DivertAndACKClientTest.java index aad33238b9..4baf5f48e7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/divert/DivertAndACKClientTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/divert/DivertAndACKClientTest.java @@ -108,7 +108,7 @@ public class DivertAndACKClientTest extends JMSTestBase { int reconnectAttempts = -1; int callTimeout = 30000; - jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, // this test needs to block on ACK + jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, // this test needs to block on ACK ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings); } diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/tools/container/LocalTestServer.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/tools/container/LocalTestServer.java index 52986b413d..dcf540205a 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/tools/container/LocalTestServer.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/tools/container/LocalTestServer.java @@ -277,7 +277,7 @@ public class LocalTestServer implements Server, Runnable { ArrayList connectors = new ArrayList<>(); connectors.add("netty"); - getJMSServerManager().createConnectionFactory(objectName, false, type, connectors, clientId, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, prefetchSize, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, blockOnAcknowledge, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, dupsOkBatchSize, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings); + getJMSServerManager().createConnectionFactory(objectName, false, type, connectors, clientId, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, prefetchSize, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, blockOnAcknowledge, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, dupsOkBatchSize, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings); } @Override diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java index 8a0b86e7a1..de463f79cb 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java @@ -342,6 +342,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase { " false" + " \n" + " \n" + + " The level of compression to use. Must be -1 or between 0-9" + + " CompressionLevel" + + " int" + + " -1" + + " \n" + + " \n" + " The timeout in milliseconds for failover call (or -1 for infinite)\n" + " CallFailoverTimeout\n" + " long\n" +