ARTEMIS-4186 Ability to set compressionLevel for compressLargeMessages
This commit is contained in:
parent
600799de30
commit
582a689cdb
|
@ -47,6 +47,7 @@ public class ServerLocatorConfig {
|
|||
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
|
||||
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
|
||||
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
|
||||
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
|
||||
public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
|
||||
public boolean useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
|
||||
|
||||
|
@ -84,5 +85,6 @@ public class ServerLocatorConfig {
|
|||
failoverAttempts = locator.failoverAttempts;
|
||||
initialMessagePacketSize = locator.initialMessagePacketSize;
|
||||
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
|
||||
compressionLevel = locator.compressionLevel;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,6 +69,8 @@ public final class ActiveMQClient {
|
|||
|
||||
public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;
|
||||
|
||||
public static final int DEFAULT_COMPRESSION_LEVEL = -1;
|
||||
|
||||
public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
|
||||
|
||||
public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
|
||||
|
|
|
@ -809,6 +809,22 @@ public interface ServerLocator extends AutoCloseable {
|
|||
*/
|
||||
ServerLocator setCompressLargeMessage(boolean compressLargeMessages);
|
||||
|
||||
/**
|
||||
* What compression level is in use
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
int getCompressionLevel();
|
||||
|
||||
/**
|
||||
* Sets what compressionLevel to use when compressing messages
|
||||
* Value must be -1 (default), or 0-9
|
||||
*
|
||||
* @param compressionLevel
|
||||
* @return this ServerLocator
|
||||
*/
|
||||
ServerLocator setCompressionLevel(int compressionLevel);
|
||||
|
||||
// XXX No javadocs
|
||||
ServerLocator addClusterTopologyListener(ClusterTopologyListener listener);
|
||||
|
||||
|
|
|
@ -441,6 +441,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
|||
if (session.isCompressLargeMessages()) {
|
||||
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
|
||||
deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
|
||||
deflaterReader.setLevel(session.getCompressionLevel());
|
||||
input = deflaterReader;
|
||||
}
|
||||
|
||||
|
|
|
@ -836,7 +836,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);
|
||||
|
||||
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
|
||||
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
|
||||
|
||||
synchronized (sessions) {
|
||||
if (closed || !clientProtocolManager.isAlive()) {
|
||||
|
|
|
@ -123,6 +123,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
|
||||
private final boolean compressLargeMessages;
|
||||
|
||||
private final int compressionLevel;
|
||||
|
||||
private volatile int initialMessagePacketSize;
|
||||
|
||||
private final boolean cacheLargeMessageClient;
|
||||
|
@ -184,6 +186,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
final boolean cacheLargeMessageClient,
|
||||
final int minLargeMessageSize,
|
||||
final boolean compressLargeMessages,
|
||||
final int compressionLevel,
|
||||
final int initialMessagePacketSize,
|
||||
final String groupID,
|
||||
final SessionContext sessionContext,
|
||||
|
@ -237,6 +240,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
|
||||
this.compressLargeMessages = compressLargeMessages;
|
||||
|
||||
this.compressionLevel = compressionLevel;
|
||||
|
||||
this.initialMessagePacketSize = initialMessagePacketSize;
|
||||
|
||||
this.groupID = groupID;
|
||||
|
@ -1186,6 +1191,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
return compressLargeMessages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCompressionLevel() {
|
||||
return compressionLevel;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the cacheLargeMessageClient
|
||||
*/
|
||||
|
|
|
@ -41,6 +41,8 @@ public interface ClientSessionInternal extends ClientSession {
|
|||
|
||||
boolean isCompressLargeMessages();
|
||||
|
||||
int getCompressionLevel();
|
||||
|
||||
void expire(ClientConsumer consumer, Message message) throws ActiveMQException;
|
||||
|
||||
void addConsumer(ClientConsumerInternal consumer);
|
||||
|
|
|
@ -1292,6 +1292,17 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCompressionLevel() {
|
||||
return config.compressionLevel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerLocatorImpl setCompressionLevel(int compressionLevel) {
|
||||
this.config.compressionLevel = compressionLevel;
|
||||
return this;
|
||||
}
|
||||
|
||||
private void checkWrite() {
|
||||
synchronized (stateGuard) {
|
||||
if (state != null && state != STATE.CLOSED) {
|
||||
|
|
|
@ -117,4 +117,8 @@ public class DeflaterReader extends InputStream {
|
|||
return bytesRead.get();
|
||||
}
|
||||
|
||||
public void setLevel(int level) {
|
||||
deflater.setLevel(level);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -854,6 +854,14 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
|
|||
serverLocator.setCompressLargeMessage(avoidLargeMessages);
|
||||
}
|
||||
|
||||
public int getCompressionLevel() {
|
||||
return serverLocator.getCompressionLevel();
|
||||
}
|
||||
|
||||
public void setCompressionLevel(int compressionLevel) {
|
||||
serverLocator.setCompressionLevel(compressionLevel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
ServerLocator locator0 = serverLocator;
|
||||
|
|
|
@ -246,6 +246,7 @@ public interface JMSServerManager extends ActiveMQComponent {
|
|||
boolean cacheLargeMessagesClient,
|
||||
int minLargeMessageSize,
|
||||
boolean compressLargeMessage,
|
||||
int compressionLevel,
|
||||
int consumerWindowSize,
|
||||
int consumerMaxRate,
|
||||
int confirmationWindowSize,
|
||||
|
@ -282,6 +283,7 @@ public interface JMSServerManager extends ActiveMQComponent {
|
|||
boolean cacheLargeMessagesClient,
|
||||
int minLargeMessageSize,
|
||||
boolean compressLargeMessages,
|
||||
int compressionLevel,
|
||||
int consumerWindowSize,
|
||||
int consumerMaxRate,
|
||||
int confirmationWindowSize,
|
||||
|
|
|
@ -82,6 +82,10 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
|
|||
|
||||
ConnectionFactoryConfiguration setCompressLargeMessages(boolean avoidLargeMessages);
|
||||
|
||||
int getCompressionLevel();
|
||||
|
||||
ConnectionFactoryConfiguration setCompressionLevel(int compressionLevel);
|
||||
|
||||
int getConsumerWindowSize();
|
||||
|
||||
ConnectionFactoryConfiguration setConsumerWindowSize(int consumerWindowSize);
|
||||
|
|
|
@ -67,6 +67,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
|
|||
|
||||
private boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
|
||||
|
||||
private int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
|
||||
|
||||
private int consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
|
||||
|
||||
private int consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE;
|
||||
|
@ -281,6 +283,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCompressionLevel() {
|
||||
return compressionLevel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionFactoryConfiguration setCompressionLevel(final int compressionLevel) {
|
||||
this.compressionLevel = compressionLevel;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getConsumerWindowSize() {
|
||||
return consumerWindowSize;
|
||||
|
@ -642,6 +655,9 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
|
|||
enableSharedClientID = buffer.readableBytes() > 0 ? BufferHelper.readNullableBoolean(buffer) : ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID;
|
||||
|
||||
useTopologyForLoadBalancing = buffer.readableBytes() > 0 ? BufferHelper.readNullableBoolean(buffer) : ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
|
||||
|
||||
compressionLevel = buffer.readableBytes() > 0 ? BufferHelper.readNullableInteger(buffer) : ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -738,6 +754,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
|
|||
BufferHelper.writeNullableBoolean(buffer, enableSharedClientID);
|
||||
|
||||
BufferHelper.writeNullableBoolean(buffer, useTopologyForLoadBalancing);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, compressionLevel);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -858,7 +876,9 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
|
|||
|
||||
BufferHelper.sizeOfNullableBoolean(enableSharedClientID) +
|
||||
|
||||
BufferHelper.sizeOfNullableBoolean(useTopologyForLoadBalancing);
|
||||
BufferHelper.sizeOfNullableBoolean(useTopologyForLoadBalancing) +
|
||||
|
||||
BufferHelper.sizeOfNullableInteger(compressionLevel);
|
||||
|
||||
return size;
|
||||
}
|
||||
|
|
|
@ -872,6 +872,7 @@ public class JMSServerManagerImpl extends CleaningActivateCallback implements JM
|
|||
final boolean cacheLargeMessagesClient,
|
||||
final int minLargeMessageSize,
|
||||
final boolean compressLargeMessage,
|
||||
final int compressionLevel,
|
||||
final int consumerWindowSize,
|
||||
final int consumerMaxRate,
|
||||
final int confirmationWindowSize,
|
||||
|
@ -917,6 +918,7 @@ public class JMSServerManagerImpl extends CleaningActivateCallback implements JM
|
|||
final boolean cacheLargeMessagesClient,
|
||||
final int minLargeMessageSize,
|
||||
final boolean compressLargeMessages,
|
||||
final int compressionLevel,
|
||||
final int consumerWindowSize,
|
||||
final int consumerMaxRate,
|
||||
final int confirmationWindowSize,
|
||||
|
@ -943,7 +945,7 @@ public class JMSServerManagerImpl extends CleaningActivateCallback implements JM
|
|||
checkInitialised();
|
||||
ActiveMQConnectionFactory cf = connectionFactories.get(name);
|
||||
if (cf == null) {
|
||||
ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name).setHA(ha).setBindings(registryBindings).setDiscoveryGroupName(discoveryGroupName).setFactoryType(cfType).setClientID(clientID).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setCacheLargeMessagesClient(cacheLargeMessagesClient).setMinLargeMessageSize(minLargeMessageSize).setCompressLargeMessages(compressLargeMessages).setConsumerWindowSize(consumerWindowSize).setConsumerMaxRate(consumerMaxRate).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setProducerMaxRate(producerMaxRate).setBlockOnAcknowledge(blockOnAcknowledge).setBlockOnDurableSend(blockOnDurableSend).setBlockOnNonDurableSend(blockOnNonDurableSend).setAutoGroup(autoGroup).setPreAcknowledge(preAcknowledge).setLoadBalancingPolicyClassName(loadBalancingPolicyClassName).setTransactionBatchSize(transactionBatchSize).setDupsOKBatchSize(dupsOKBatchSize).setUseGlobalPools(useGlobalPools).setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize).setThreadPoolMaxSize(threadPoolMaxSize).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setReconnectAttempts(reconnectAttempts).setFailoverOnInitialConnection(failoverOnInitialConnection);
|
||||
ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name).setHA(ha).setBindings(registryBindings).setDiscoveryGroupName(discoveryGroupName).setFactoryType(cfType).setClientID(clientID).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setCacheLargeMessagesClient(cacheLargeMessagesClient).setMinLargeMessageSize(minLargeMessageSize).setCompressLargeMessages(compressLargeMessages).setCompressionLevel(compressionLevel).setConsumerWindowSize(consumerWindowSize).setConsumerMaxRate(consumerMaxRate).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setProducerMaxRate(producerMaxRate).setBlockOnAcknowledge(blockOnAcknowledge).setBlockOnDurableSend(blockOnDurableSend).setBlockOnNonDurableSend(blockOnNonDurableSend).setAutoGroup(autoGroup).setPreAcknowledge(preAcknowledge).setLoadBalancingPolicyClassName(loadBalancingPolicyClassName).setTransactionBatchSize(transactionBatchSize).setDupsOKBatchSize(dupsOKBatchSize).setUseGlobalPools(useGlobalPools).setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize).setThreadPoolMaxSize(threadPoolMaxSize).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setReconnectAttempts(reconnectAttempts).setFailoverOnInitialConnection(failoverOnInitialConnection);
|
||||
createConnectionFactory(true, configuration, registryBindings);
|
||||
}
|
||||
}
|
||||
|
@ -1212,6 +1214,7 @@ public class JMSServerManagerImpl extends CleaningActivateCallback implements JM
|
|||
cf.setReconnectAttempts(cfConfig.getReconnectAttempts());
|
||||
cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
|
||||
cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
|
||||
cf.setCompressionLevel(cfConfig.getCompressionLevel());
|
||||
cf.setGroupID(cfConfig.getGroupID());
|
||||
cf.setProtocolManagerFactoryStr(cfConfig.getProtocolManagerFactoryStr());
|
||||
cf.setDeserializationBlackList(cfConfig.getDeserializationBlackList());
|
||||
|
|
|
@ -689,6 +689,14 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti
|
|||
mcfProperties.setCompressLargeMessage(compressLargeMessage);
|
||||
}
|
||||
|
||||
public Integer getCompressionLevel() {
|
||||
return mcfProperties.getCompressionLevel();
|
||||
}
|
||||
|
||||
public void setCompressionLevel(final Integer compressionLevel) {
|
||||
mcfProperties.setCompressionLevel(compressionLevel);
|
||||
}
|
||||
|
||||
public Integer getInitialConnectAttempts() {
|
||||
return mcfProperties.getInitialConnectAttempts();
|
||||
}
|
||||
|
|
|
@ -640,6 +640,29 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
|
|||
raProperties.setCompressLargeMessage(compressLargeMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get compressionLevel
|
||||
*
|
||||
* @return The value
|
||||
*/
|
||||
public Integer getCompressionLevel() {
|
||||
logger.trace("getCompressionLevel()");
|
||||
|
||||
return raProperties.getCompressionLevel();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets what compressionLevel to use when compressing messages
|
||||
* Value must be -1 (default) or 0-9
|
||||
*
|
||||
* @param compressionLevel The value
|
||||
*/
|
||||
public void setCompressionLevel(final Integer compressionLevel) {
|
||||
logger.trace("setCompressionLevel({})", compressionLevel);
|
||||
|
||||
raProperties.setCompressionLevel(compressionLevel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get call timeout
|
||||
*
|
||||
|
@ -1828,6 +1851,10 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
|
|||
if (val2 != null) {
|
||||
cf.setInitialMessagePacketSize(val2);
|
||||
}
|
||||
val2 = overrideProperties.getCompressionLevel() != null ? overrideProperties.getCompressionLevel() : raProperties.getCompressionLevel();
|
||||
if (val2 != null) {
|
||||
cf.setCompressionLevel(val2);
|
||||
}
|
||||
|
||||
Long val3 = overrideProperties.getClientFailureCheckPeriod() != null ? overrideProperties.getClientFailureCheckPeriod() : raProperties.getClientFailureCheckPeriod();
|
||||
if (val3 != null) {
|
||||
|
|
|
@ -76,6 +76,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
|
|||
|
||||
private Boolean compressLargeMessage;
|
||||
|
||||
private Integer compressionLevel;
|
||||
|
||||
private Integer consumerWindowSize;
|
||||
|
||||
private Integer producerWindowSize;
|
||||
|
@ -178,6 +180,15 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
|
|||
this.compressLargeMessage = compressLargeMessage;
|
||||
}
|
||||
|
||||
public Integer getCompressionLevel() {
|
||||
return compressionLevel;
|
||||
}
|
||||
|
||||
public void setCompressionLevel(Integer compressionLevel) {
|
||||
hasBeenUpdated = true;
|
||||
this.compressionLevel = compressionLevel;
|
||||
}
|
||||
|
||||
public String getConnectionLoadBalancingPolicyClassName() {
|
||||
logger.trace("getConnectionLoadBalancingPolicyClassName()");
|
||||
|
||||
|
|
|
@ -68,11 +68,20 @@ is transferred to the server's side. Notice that there's no special treatment
|
|||
at the server's side, all the compressing and uncompressing is done at the
|
||||
client.
|
||||
|
||||
This behavior can be tuned further by setting an optional parameter: `compressionLevel`.
|
||||
This will decide how much the message body should be compressed. `compressionLevel`
|
||||
accepts an integer of `-1` or a value between `0-9`. The default value is `-1` which
|
||||
corresponds to around level 6-7.
|
||||
|
||||
If the compressed size of a large message is below `minLargeMessageSize`, it is
|
||||
sent to server as regular messages. This means that the message won't be
|
||||
written into the server's large-message data directory, thus reducing the disk
|
||||
I/O.
|
||||
|
||||
**Note:** A higher `compressionLevel` means the message body will get further compressed,
|
||||
but this is at the cost of speed and computational overhead. Make sure to tune this value
|
||||
according to its specific use-case.
|
||||
|
||||
## Streaming large messages from Core Protocol
|
||||
|
||||
Apache ActiveMQ Artemis supports setting the body of messages using input and
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.zip.Deflater;
|
|||
import io.netty.util.internal.PlatformDependent;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
|
@ -38,6 +39,8 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
|
@ -510,6 +513,73 @@ public class LargeMessageCompressTest extends LargeMessageTest {
|
|||
validateNoFilesOnLargeDir();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargeMessageCompressionLevel() throws Exception {
|
||||
|
||||
SimpleString address1 = new SimpleString("address1");
|
||||
SimpleString address2 = new SimpleString("address2");
|
||||
SimpleString address3 = new SimpleString("address3");
|
||||
|
||||
ActiveMQServer server = createServer(true, true);
|
||||
server.start();
|
||||
|
||||
ServerLocator locator1 = ActiveMQClient.createServerLocator("tcp://localhost:61616?compressionLevel=1");
|
||||
ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0?compressionLevel=5");
|
||||
ServerLocator locator3 = ActiveMQClient.createServerLocator("vm://0");
|
||||
locator1.setCompressLargeMessage(true);
|
||||
locator2.setCompressLargeMessage(true);
|
||||
locator3.setCompressLargeMessage(true);
|
||||
locator3.setCompressionLevel(9);
|
||||
|
||||
ClientSessionFactory sf1 = locator1.createSessionFactory();
|
||||
ClientSessionFactory sf2 = locator2.createSessionFactory();
|
||||
ClientSessionFactory sf3 = locator3.createSessionFactory();
|
||||
ClientSession session1 = sf1.createSession(false, true, true);
|
||||
ClientSession session2 = sf2.createSession(false, true, true);
|
||||
ClientSession session3 = sf3.createSession(false, true, true);
|
||||
|
||||
ClientProducer producer1 = session1.createProducer(address1);
|
||||
ClientProducer producer2 = session2.createProducer(address2);
|
||||
ClientProducer producer3 = session3.createProducer(address3);
|
||||
|
||||
session1.createQueue(new QueueConfiguration(address1));
|
||||
session2.createQueue(new QueueConfiguration(address2));
|
||||
session3.createQueue(new QueueConfiguration(address3));
|
||||
|
||||
String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
|
||||
for (int i = 0; i < 20; i++) {
|
||||
inputString = inputString + inputString;
|
||||
}
|
||||
|
||||
ClientMessage message = session1.createMessage(true);
|
||||
message.getBodyBuffer().writeString(inputString);
|
||||
producer1.send(message);
|
||||
producer2.send(message);
|
||||
producer3.send(message);
|
||||
|
||||
QueueControl queueControl1 = (QueueControl)server.getManagementService().
|
||||
getResource(ResourceNames.QUEUE + address1);
|
||||
QueueControl queueControl2 = (QueueControl)server.getManagementService().
|
||||
getResource(ResourceNames.QUEUE + address2);
|
||||
QueueControl queueControl3 = (QueueControl)server.getManagementService().
|
||||
getResource(ResourceNames.QUEUE + address3);
|
||||
|
||||
assertTrue(1 == queueControl1.getMessageCount());
|
||||
assertTrue(1 == queueControl2.getMessageCount());
|
||||
assertTrue(1 == queueControl3.getMessageCount());
|
||||
assertTrue(message.getPersistentSize() > queueControl1.getPersistentSize());
|
||||
assertTrue(queueControl1.getPersistentSize() > queueControl2.getPersistentSize());
|
||||
assertTrue(queueControl2.getPersistentSize() > queueControl3.getPersistentSize());
|
||||
|
||||
sf1.close();
|
||||
sf2.close();
|
||||
sf3.close();
|
||||
locator1.close();
|
||||
locator2.close();
|
||||
locator3.close();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testSendServerMessage() throws Exception {
|
||||
|
|
|
@ -149,7 +149,7 @@ public class PreACKJMSTest extends JMSTestBase {
|
|||
|
||||
ArrayList<String> connectors = registerConnectors(server, connectorConfigs);
|
||||
|
||||
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, connectors, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, true, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings);
|
||||
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, connectors, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, true, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -225,7 +225,7 @@ public class ReSendMessageTest extends JMSTestBase {
|
|||
int reconnectAttempts = -1;
|
||||
int callTimeout = 30000;
|
||||
|
||||
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, true, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings);
|
||||
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, true, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -49,7 +49,7 @@ public class SessionClosedOnRemotingConnectionFailureTest extends JMSTestBase {
|
|||
List<TransportConfiguration> connectorConfigs = new ArrayList<>();
|
||||
connectorConfigs.add(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
|
||||
|
||||
jmsServer.createConnectionFactory("cffoo", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, 0, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/cffoo");
|
||||
jmsServer.createConnectionFactory("cffoo", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, 0, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/cffoo");
|
||||
|
||||
cf = (ConnectionFactory) namingContext.lookup("/cffoo");
|
||||
|
||||
|
|
|
@ -176,6 +176,6 @@ public class TextMessageTest extends JMSTestBase {
|
|||
int reconnectAttempts = -1;
|
||||
int callTimeout = 30000;
|
||||
|
||||
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, true, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings);
|
||||
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, true, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class InvalidConnectorTest extends JMSTestBase {
|
|||
int reconnectAttempts = -1;
|
||||
int callTimeout = 30000;
|
||||
|
||||
jmsServer.createConnectionFactory("invalid-cf", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/invalid-cf");
|
||||
jmsServer.createConnectionFactory("invalid-cf", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE, ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/invalid-cf");
|
||||
|
||||
ActiveMQConnectionFactory invalidCf = (ActiveMQConnectionFactory) namingContext.lookup("/invalid-cf");
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ public class DivertAndACKClientTest extends JMSTestBase {
|
|||
int reconnectAttempts = -1;
|
||||
int callTimeout = 30000;
|
||||
|
||||
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, // this test needs to block on ACK
|
||||
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs), null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, callTimeout, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, // this test needs to block on ACK
|
||||
ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND, ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, retryInterval, retryIntervalMultiplier, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, reconnectAttempts, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings);
|
||||
}
|
||||
|
||||
|
|
|
@ -277,7 +277,7 @@ public class LocalTestServer implements Server, Runnable {
|
|||
ArrayList<String> connectors = new ArrayList<>();
|
||||
connectors.add("netty");
|
||||
|
||||
getJMSServerManager().createConnectionFactory(objectName, false, type, connectors, clientId, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, prefetchSize, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, blockOnAcknowledge, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, dupsOkBatchSize, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings);
|
||||
getJMSServerManager().createConnectionFactory(objectName, false, type, connectors, clientId, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_COMPRESSION_LEVEL, prefetchSize, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, blockOnAcknowledge, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, dupsOkBatchSize, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, jndiBindings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -342,6 +342,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase {
|
|||
" <config-property-value>false</config-property-value>" +
|
||||
" </config-property>\n" +
|
||||
" <config-property>\n" +
|
||||
" <description>The level of compression to use. Must be -1 or between 0-9</description>" +
|
||||
" <config-property-name>CompressionLevel</config-property-name>" +
|
||||
" <config-property-type>int</config-property-type>" +
|
||||
" <config-property-value>-1</config-property-value>" +
|
||||
" </config-property>\n" +
|
||||
" <config-property>\n" +
|
||||
" <description>The timeout in milliseconds for failover call (or -1 for infinite)</description>\n" +
|
||||
" <config-property-name>CallFailoverTimeout</config-property-name>\n" +
|
||||
" <config-property-type>long</config-property-type>\n" +
|
||||
|
|
Loading…
Reference in New Issue