ARTEMIS-3943 Adjusting defalut address settings to avoid OME from paging and flow control

This commit is contained in:
Clebert Suconic 2022-08-22 14:24:42 -04:00 committed by clebertsuconic
parent aa4c642796
commit d06459df57
47 changed files with 1395 additions and 329 deletions

View File

@ -134,20 +134,27 @@ ${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-st
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- if max-size-bytes and max-size-messages were both enabled, the system will enter into paging
based on the first attribute to hits the maximum value -->
<!-- limit for the address in bytes, -1 means unlimited -->
<max-size-bytes>-1</max-size-bytes>
<!-- limit for the address in messages, -1 means unlimited -->
<max-size-messages>-1</max-size-messages>
<!-- the size of each file on paging. Notice we keep files in memory while they are in use.
Lower this setting if you have too many queues in memory. -->
<page-size-bytes>10M</page-size-bytes>
<!-- how many messages are kept in memory from paging. The system will stop reading whenever this or max-read-page-bytes hits the max first. -->
<max-read-page-messages>1000</max-read-page-messages>
<!-- how many bytes equivalent of messages are kept in memory from paging (based on memory estimate). The system will stop reading whenever this or max-read-page-messages hits the max first. -->
<max-read-page-bytes>1M</max-read-page-bytes>
<!-- uncomment the next parameter if you want to customize how many messages are read from paging into queues.
-1 means do not set a limit by messages. -->
<!-- <max-read-page-messages>-1</max-read-page-messages> -->
<!-- uncomment the next parameter if you want to customize how much memory is used to keep messages from paging on the queues.
if you set it to -1 it means no limit is set. If both max-read-page-messages and max-read-page-bytes are -1, the system
will just allow infinite reading as long as consumers are issuing credits, independently of the ack state of these messages. -->
<!-- <max-read-page-bytes>1M</max-read-page-bytes> -->
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>${full-policy}</address-full-policy>
<auto-create-queues>${auto-create}</auto-create-queues>

View File

@ -222,8 +222,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String MAX_READ_PAGE_MESSAGES_NODE_NAME = "max-read-page-messages";
private static final String PAGE_FLOW_CONTROL_NAME = "page-flow-control";
private static final String PAGE_SIZE_BYTES_NODE_NAME = "page-size-bytes";
private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME = "page-max-cache-size";
@ -1261,8 +1259,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setMaxRedeliveryDelay(XMLUtil.parseLong(child));
} else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxSizeBytes(ByteUtil.convertTextBytes(getTrimmedTextContent(child)));
} else if (PAGE_FLOW_CONTROL_NAME.equalsIgnoreCase(name)) {
addressSettings.setPageFlowControl(XMLUtil.parseBoolean(child));
} else if (MAX_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxSizeMessages(XMLUtil.parseInt(child));
} else if (MAX_SIZE_BYTES_REJECT_THRESHOLD_NODE_NAME.equalsIgnoreCase(name)) {
@ -1273,11 +1269,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setPageSizeBytes((int) pageSizeLong);
} else if (MAX_READ_PAGE_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
long maxReadPageMessages = Long.parseLong(getTrimmedTextContent(child));
Validators.POSITIVE_INT.validate(MAX_READ_PAGE_MESSAGES_NODE_NAME, maxReadPageMessages);
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(MAX_READ_PAGE_MESSAGES_NODE_NAME, maxReadPageMessages);
addressSettings.setMaxReadPageMessages((int)maxReadPageMessages);
} else if (MAX_READ_PAGE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
long maxReadPageBytes = ByteUtil.convertTextBytes(getTrimmedTextContent(child));
Validators.POSITIVE_INT.validate(MAX_READ_PAGE_BYTES_NODE_NAME, maxReadPageBytes);
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(MAX_READ_PAGE_BYTES_NODE_NAME, maxReadPageBytes);
addressSettings.setMaxReadPageBytes((int)maxReadPageBytes);
} else if (PAGE_MAX_CACHE_SIZE_NODE_NAME.equalsIgnoreCase(name)) {
if (!printPageMaxSizeUsed) {

View File

@ -280,11 +280,7 @@ public class PagingStoreImpl implements PagingStore {
@Override
public int getMaxPageReadBytes() {
if (maxPageReadBytes <= 0) {
return pageSize * 2;
} else {
return maxPageReadBytes;
}
return maxPageReadBytes;
}
@Override

View File

@ -2224,5 +2224,4 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 224117, value = "\"page-max-cache-size\" being used on broker.xml. This configuration attribute is no longer used and it will be ignored.", format = Message.Format.MESSAGE_FORMAT)
void pageMaxSizeUsed();
}

View File

@ -175,6 +175,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile boolean queueDestroyed = false;
// once we delivered messages from paging, we need to call asyncDelivery upon acks
// if we flow control paging, ack more messages will open the space to deliver more messages
// hence we will need this flag to determine if it was paging before.
private volatile boolean pageDelivered = false;
private final PagingStore pagingStore;
protected final PageSubscription pageSubscription;
@ -3172,8 +3177,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (queueDestroyed) {
return;
}
if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.tryNext() != PageIterator.NextResult.noElements) {
scheduleDepage(false);
if (pageIterator != null && pageSubscription.isPaging()) {
// we will issue a delivery runnable to check for released space from acks and resume depage
pageDelivered = true;
if (!depagePending && needsDepage() && pageIterator.tryNext() != PageIterator.NextResult.noElements) {
scheduleDepage(false);
}
} else {
pageDelivered = false;
}
}
@ -3184,14 +3196,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* @return
*/
private boolean needsDepage() {
AddressSettings thisSettings = this.addressSettings;
if (thisSettings != null && thisSettings.isPageFlowControl()) {
// if deliveringControl is set, we will use the deliveringMetrics to decide on depaging
// this is particularly needed when paging is used with a client that does not have flow control
// (OpenWire has it usually off by default)
return (queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < pageSubscription.getPagingStore().getMaxPageReadBytes() && (queueMemorySize.getElements() + deliveringMetrics.getMessageCount()) < pageSubscription.getPagingStore().getMaxPageReadMessages();
final int maxReadMessages = pageSubscription.getPagingStore().getMaxPageReadMessages();
final int maxReadBytes = pageSubscription.getPagingStore().getMaxPageReadBytes();
if (maxReadMessages <= 0 && maxReadBytes <= 0) {
// if both maxValues are disabled, we will protect the broker using an older semantic
// where we don't look for deliveringMetrics..
// this would give users a chance to switch to older protection mode.
return queueMemorySize.getSize() < pageSubscription.getPagingStore().getMaxSize() &&
intermediateMessageReferences.size() + messageReferences.size() < MAX_DEPAGE_NUM;
} else {
return queueMemorySize.getSize() < pageSubscription.getPagingStore().getMaxPageReadBytes() && queueMemorySize.getElements() < pageSubscription.getPagingStore().getMaxPageReadMessages();
return (maxReadBytes <= 0 || (queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < maxReadBytes) &&
(maxReadMessages <= 0 || (queueMemorySize.getElements() + deliveringMetrics.getMessageCount()) < maxReadMessages);
}
}
@ -4449,10 +4465,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void decDelivering(final MessageReference reference) {
deliveringMetrics.decrementMetrics(reference);
AddressSettings theSettings = this.addressSettings;
if (theSettings != null && theSettings.isPageFlowControl()) {
deliverAsync(); // we check for async delivery after acks
// in case paging stopped for lack of space
if (pageDelivered) {
/* we check for async delivery after acks
in case paging stopped for lack of space */
deliverAsync();
}
}

View File

@ -42,7 +42,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final long DEFAULT_MAX_SIZE_MESSAGES = -1;
public static final int DEFAULT_MAX_READ_PAGE_MESSAGES = 1000;
public static final int DEFAULT_MAX_READ_PAGE_MESSAGES = -1;
public static final AddressFullMessagePolicy DEFAULT_ADDRESS_FULL_MESSAGE_POLICY = AddressFullMessagePolicy.PAGE;
@ -88,8 +88,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final boolean DEFAULT_AUTO_CREATE_ADDRESSES = true;
public static final boolean DEFAULT_PAGE_FLOW_CONTROL = false;
public static final boolean DEFAULT_AUTO_DELETE_ADDRESSES = true;
public static final long DEFAULT_AUTO_DELETE_ADDRESSES_DELAY = 0;
@ -149,8 +147,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Integer maxReadPageMessages = null;
private Boolean pageFlowControl = null;
private Long maxSizeMessages = null;
private Integer pageSizeBytes = null;
@ -355,7 +351,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit;
this.enableIngressTimestamp = other.enableIngressTimestamp;
this.pageFlowControl = other.pageFlowControl;
}
public AddressSettings() {
@ -451,15 +446,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public boolean isPageFlowControl() {
return pageFlowControl != null ? pageFlowControl : AddressSettings.DEFAULT_PAGE_FLOW_CONTROL;
}
public AddressSettings setPageFlowControl(Boolean pageFlowControl) {
this.pageFlowControl = pageFlowControl;
return this;
}
public DeletionPolicy getConfigDeleteQueues() {
return configDeleteQueues != null ? configDeleteQueues : AddressSettings.DEFAULT_CONFIG_DELETE_QUEUES;
}
@ -1237,9 +1223,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (enableIngressTimestamp == null) {
enableIngressTimestamp = merged.enableIngressTimestamp;
}
if (pageFlowControl == null) {
pageFlowControl = merged.pageFlowControl;
}
}
@Override
@ -1489,10 +1472,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
maxReadPageMessages = BufferHelper.readNullableInteger(buffer);
}
if (buffer.readableBytes() > 0) {
pageFlowControl = BufferHelper.readNullableBoolean(buffer);
}
}
@Override
@ -1563,8 +1542,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp) +
BufferHelper.sizeOfNullableLong(maxSizeMessages) +
BufferHelper.sizeOfNullableInteger(maxReadPageMessages) +
BufferHelper.sizeOfNullableInteger(maxReadPageBytes) +
BufferHelper.sizeOfNullableBoolean(pageFlowControl);
BufferHelper.sizeOfNullableInteger(maxReadPageBytes);
}
@Override
@ -1704,8 +1682,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableInteger(buffer, maxReadPageBytes);
BufferHelper.writeNullableInteger(buffer, maxReadPageMessages);
BufferHelper.writeNullableBoolean(buffer, pageFlowControl);
}
/* (non-Javadoc)
@ -1782,7 +1758,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode());
result = prime * result + ((maxSizeMessages == null) ? 0 : maxSizeMessages.hashCode());
result = prime * result + ((pageFlowControl == null) ? 0 : pageFlowControl.hashCode());
return result;
}
@ -2155,12 +2130,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
} else if (!maxSizeMessages.equals(other.maxSizeMessages))
return false;
if (pageFlowControl == null) {
if (other.pageFlowControl != null)
return false;
} else if (!pageFlowControl.equals(other.pageFlowControl))
return false;
return true;
}
@ -2298,7 +2267,6 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
enableMetrics +
", enableIngressTime=" +
enableIngressTimestamp +
", deliveringControl=" + pageFlowControl +
"]";
}
}

View File

@ -4006,15 +4006,6 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="page-flow-control" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
If this is tru the system will check for deliverin statistics before issuing more depaging. This is particularly useful when you either want to ensure enough memory to read from paging
or if you are using a client that does not allow flow control.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="address-full-policy" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -299,30 +299,6 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
assertEquals(ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(), bconfig.getProducerWindowSize());
}
@Test
public void testParseAddressSettingsFlowControl() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
String configStr = "<core xmlns=\"urn:activemq:core\">\n" +
" <address-settings>\n" +
" <!-- if you define auto-create on certain queues, management has to be auto-create -->\n" +
" <address-setting match=\"hello\">\n" +
" <page-flow-control>true</page-flow-control>\n" +
" </address-setting>\n" +
" </address-settings>\n" +
"</core>";
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration config = parser.parseMainConfig(input);
Map<String, AddressSettings> addressSettings = config.getAddressSettings();
assertEquals(1, addressSettings.size());
AddressSettings settings = addressSettings.get("hello");
Assert.assertNotNull(settings);
Assert.assertTrue(settings.isPageFlowControl());
}
@Test
public void testParsingOverflowPageSize() throws Exception {
testParsingOverFlow("<address-settings>" + "\n" + "<address-setting match=\"#\">" + "\n" + "<page-size-bytes>2147483648</page-size-bytes>\n" + "</address-setting>" + "\n" + "</address-settings>" + "\n");
@ -407,6 +383,19 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
Assert.assertEquals(33, settings.getMaxReadPageMessages());
}
@Test
public void testParseMaxReadAddressSettingsAllNegative() throws Exception {
String configStr = "<configuration><address-settings>" + "\n" + "<address-setting match=\"foo\">" + "\n" + "<max-read-page-bytes>-1</max-read-page-bytes><max-read-page-messages>-1</max-read-page-messages>.\n" + "</address-setting>" + "\n" + "</address-settings></configuration>" + "\n";
FileConfigurationParser parser = new FileConfigurationParser();
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration configuration = parser.parseMainConfig(input);
AddressSettings settings = configuration.getAddressSettings().get("foo");
Assert.assertEquals(-1, settings.getMaxReadPageBytes());
Assert.assertEquals(-1, settings.getMaxReadPageMessages());
}
// you should not use K, M notations on address settings max-size-messages
@Test
public void testExpectedErrorOverMaxMessageNotation() throws Exception {

View File

@ -1484,52 +1484,74 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
protected ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize) {
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize) {
return createServer(realFiles, configuration, pageSize, maxAddressSize, (Map<String, AddressSettings>) null);
}
protected ActiveMQServer createServer(final boolean realFiles,
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final int maxReadMessages,
final int maxReadBytes) {
return createServer(realFiles, configuration, pageSize, maxAddressSize, maxReadMessages, maxReadBytes, (Map<String, AddressSettings>) null);
}
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Map<String, AddressSettings> settings) {
return createServer(realFiles, configuration, pageSize, maxAddressSize, null, null, settings);
}
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Integer maxReadPageMessages,
final Integer maxReadPageBytes,
final Map<String, AddressSettings> settings) {
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
if (settings != null) {
for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
if (maxReadPageBytes != null) {
setting.getValue().setMaxReadPageBytes(maxReadPageBytes.intValue());
}
if (maxReadPageMessages != null) {
setting.getValue().setMaxReadPageMessages(maxReadPageMessages.intValue());
}
server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
}
}
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
if (maxReadPageBytes != null) {
defaultSetting.setMaxReadPageBytes(maxReadPageBytes.intValue());
}
if (maxReadPageMessages != null) {
defaultSetting.setMaxReadPageMessages(maxReadPageMessages.intValue());
}
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
applySettings(server, configuration, pageSize, maxAddressSize, maxReadPageMessages, maxReadPageBytes, settings);
return server;
}
protected ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final int maxReadPageMessages,
final Map<String, AddressSettings> settings) {
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
if (settings != null) {
for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
}
}
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxReadPageMessages(1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
return server;
protected void applySettings(ActiveMQServer server,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Integer maxReadPageMessages,
final Integer maxReadPageBytes,
final Map<String, AddressSettings> settings) {
}
protected final ActiveMQServer createServer(final boolean realFiles,
@ -1541,7 +1563,7 @@ public abstract class ActiveMQTestBase extends Assert {
if (storeType == StoreConfiguration.StoreType.DATABASE) {
setDBStoreType(configuration);
}
return createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
return createServer(realFiles, configuration, pageSize, maxAddressSize, -1, -1, settings);
}
protected final ActiveMQServer createServer(final boolean realFiles) throws Exception {
@ -1549,7 +1571,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected final ActiveMQServer createServer(final boolean realFiles, final boolean netty) throws Exception {
return createServer(realFiles, createDefaultConfig(netty), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
return createServer(realFiles, createDefaultConfig(netty), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, -1, -1);
}
protected ActiveMQServer createServer(final boolean realFiles, final Configuration configuration) {
@ -1595,7 +1617,7 @@ public abstract class ActiveMQTestBase extends Assert {
AddressSettings defaultSetting = new AddressSettings();
defaultSetting.setPageSizeBytes(pageSize);
defaultSetting.setMaxSizeBytes(maxAddressSize);
defaultSetting.setMaxSizeBytes(maxAddressSize).setMaxReadPageBytes(-1).setMaxSizeBytes(-1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);

View File

@ -96,7 +96,7 @@ Property Name|Description|Default
`max-size-messages`|The max number of messages the address could have before entering on page mode.| -1 (disabled)
`page-size-bytes`|The size of each page file used on the paging system|10MB
`address-full-policy`|This must be set to `PAGE` for paging to enable. If the value is `PAGE` then further messages will be paged to disk. If the value is `DROP` then further messages will be silently dropped. If the value is `FAIL` then the messages will be dropped and the client message producers will receive an exception. If the value is `BLOCK` then client message producers will block when they try and send further messages.|`PAGE`
`max-read-page-messages` | how many message can be read from paging into the Queue whenever more messages are needed. The system wtill stop reading if `max-read-page-bytes hits the limit first. | 1000
`max-read-page-messages` | how many message can be read from paging into the Queue whenever more messages are needed. The system wtill stop reading if `max-read-page-bytes hits the limit first. | -1
`max-read-page-bytes` | how much memory the messages read from paging can take on the Queue whenever more messages are needed. The system will stop reading if `max-read-page-messages` hits the limit first. | 2 * page-size-bytes
### max-size-bytes and max-size-messages simultaneous usage
@ -107,6 +107,8 @@ It is possible to define max-size-messages (as the maximum number of messages) a
`max-read-page-messages` and `max-read-page-bytes` are used to control messaging reading from paged file into the Queue. The broker will add messages on the Queue until either `max-read-page-meessages` or `max-read-page-bytes` reaches the limit.
If both values are set to -1 the broker will keep reading messages as long as the consumer is reaching for more messages. However this would keep the broker unprotected from consumers allocating huge transactions or consumers that don't have flow control enabled.
## Global Max Size
Beyond the `max-size-bytes` on the address you can also set the global-max-size

View File

@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@ -83,24 +82,15 @@ public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport {
}
@Override
protected ActiveMQServer createServer(final boolean realFiles,
protected void applySettings(ActiveMQServer server,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Integer pageSize1,
final Integer pageSize2,
final Map<String, AddressSettings> settings) {
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
if (settings != null) {
for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
}
}
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setRedeliveryDelay(0).setRedistributionDelay(0).setAutoCreateQueues(true).setAutoCreateAddresses(true);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
return server;
}
@Override

View File

@ -66,14 +66,14 @@ public class PagedMirrorTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
server1 = createServer(true, createDefaultConfig(0, true), 1024, 10 * 1024);
server1 = createServer(true, createDefaultConfig(0, true), 1024, 10 * 1024, -1, -1);
server1.getConfiguration().getAcceptorConfigurations().clear();
server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616");
AMQPBrokerConnectConfiguration brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
server1.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
server2 = createServer(true, createDefaultConfig(1, true), 1024, 10 * 1024);
server2 = createServer(true, createDefaultConfig(1, true), 1024, 10 * 1024, -1, -1);
server2.getConfiguration().getAcceptorConfigurations().clear();
server2.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61617");
brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(1000);

View File

@ -51,7 +51,7 @@ public class AmqpFilterChangePageTest extends ActiveMQTestBase {
int NUMBER_OF_MESSAGES = 2000;
server = createServer(true, config, 100 * 1024, 1024 * 1024);
server = createServer(true, config, 100 * 1024, 1024 * 1024, -1, -1);
server.start();
server.addAddressInfo(new AddressInfo("AD1").addRoutingType(RoutingType.MULTICAST));

View File

@ -50,7 +50,7 @@ public class AmqpMaxReadPagingTest extends AmqpClientTestSupport {
.getAddressSettings();
addressesSettings.get("#").setMaxSizeMessages(1)
.setMaxSizeBytes(100000)
.setPageSizeBytes(10000).setMaxReadPageMessages(10).setMaxReadPageBytes(10 * 1024 * 1024).setPageFlowControl(true);
.setPageSizeBytes(10000).setMaxReadPageMessages(10).setMaxReadPageBytes(10 * 1024 * 1024);
server.getConfiguration().setMessageExpiryScanPeriod(-1);
}

View File

@ -42,7 +42,7 @@ public class ExpireTestOnRestartTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
server = createServer(true);
AddressSettings setting = new AddressSettings().setExpiryAddress(SimpleString.toSimpleString("exp")).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(100 * 1024).setMaxSizeBytes(200 * 1024);
AddressSettings setting = new AddressSettings().setExpiryAddress(SimpleString.toSimpleString("exp")).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(100 * 1024).setMaxSizeBytes(200 * 1024).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
server.getConfiguration().setJournalSyncNonTransactional(false);
server.getConfiguration().setMessageExpiryScanPeriod(-1);
server.getConfiguration().setJournalSyncTransactional(false);
@ -106,8 +106,6 @@ public class ExpireTestOnRestartTest extends ActiveMQTestBase {
assertNull(cons.receiveImmediate());
cons.close();
Wait.assertFalse(queue.getPagingStore()::isPaging, 5000, 100);
cons = session.createConsumer("exp");
for (int i = 0; i < NUMBER_OF_EXPIRED_MESSAGES; i++) {
ClientMessage msg = cons.receive(5000);
@ -131,6 +129,8 @@ public class ExpireTestOnRestartTest extends ActiveMQTestBase {
session.close();
locator.close();
Wait.assertFalse(queue.getPagingStore()::isPaging, 5000, 100);
}
}

View File

@ -68,7 +68,7 @@ public class ExpiryLargeMessageTest extends ActiveMQTestBase {
server.getConfiguration().setMessageExpiryScanPeriod(600000);
AddressSettings setting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxDeliveryAttempts(5).setMaxSizeBytes(50 * 1024).setPageSizeBytes(10 * 1024).setExpiryAddress(EXPIRY).setDeadLetterAddress(DLQ);
AddressSettings setting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxDeliveryAttempts(5).setMaxSizeBytes(50 * 1024).setPageSizeBytes(10 * 1024).setExpiryAddress(EXPIRY).setDeadLetterAddress(DLQ).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
@ -262,7 +262,7 @@ public class ExpiryLargeMessageTest extends ActiveMQTestBase {
server.getConfiguration().setMessageExpiryScanPeriod(6000);
AddressSettings setting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxDeliveryAttempts(5).setMaxSizeBytes(50 * 1024).setPageSizeBytes(10 * 1024).setExpiryAddress(EXPIRY).setDeadLetterAddress(DLQ);
AddressSettings setting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxDeliveryAttempts(5).setMaxSizeBytes(50 * 1024).setPageSizeBytes(10 * 1024).setExpiryAddress(EXPIRY).setDeadLetterAddress(DLQ).setMaxReadPageMessages(-1).setMaxReadPageBytes(-1);
server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);

View File

@ -91,7 +91,6 @@ public class JmsNettyNioStressTest extends ActiveMQTestBase {
Configuration config = createBasicConfig().setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(transportConfig);
ActiveMQServer server = createServer(true, config);
server.getAddressSettingsRepository().clear();
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(AddressSettings.DEFAULT_PAGE_SIZE).
setMaxSizeBytes(AddressSettings.DEFAULT_MAX_SIZE_BYTES).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).

View File

@ -204,7 +204,7 @@ public class MultipleThreadFilterOneTest extends ActiveMQTestBase {
ActiveMQServer server;
if (isPaging) {
server = createServer(true, createDefaultConfig(isNetty), PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, createDefaultConfig(isNetty), PAGE_SIZE, PAGE_MAX, -1, -1, new HashMap<String, AddressSettings>());
} else {
server = createServer(true, isNetty);
}

View File

@ -22,9 +22,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Before;
import org.junit.Test;
@ -40,28 +38,16 @@ public class ExclusiveDivertWithClusterTest extends ClusterTestBase {
}
@Override
protected ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Map<String, AddressSettings> settings) {
protected void applySettings(ActiveMQServer server,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Integer pageSize1,
final Integer pageSize2,
final Map<String, AddressSettings> settings) {
DivertConfiguration divertConf = new DivertConfiguration().setName("notifications-divert").setAddress("*.Provider.*.Agent.*.Status").setForwardingAddress("Notifications").setExclusive(true);
configuration.addDivertConfiguration(divertConf);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
if (settings != null) {
for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
}
}
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setRedeliveryDelay(0).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setRedistributionDelay(0).setAutoCreateQueues(true).setAutoCreateAddresses(true);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
return server;
}
@Test

View File

@ -165,7 +165,7 @@ public class TwoWayTwoNodeClusterTest extends ClusterTestBase {
addrSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addrSettings.setPageSizeBytes(524288);
addrSettings.setMessageCounterHistoryDayLimit(10);
addrSettings.setRedistributionDelay(1000);
addrSettings.setRedistributionDelay(1000).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
}
}

View File

@ -323,15 +323,7 @@ public class QueueControlTest extends ManagementTestBase {
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertNull(queueControl.getDeadLetterAddress());
server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings() {
private static final long serialVersionUID = -4919035864731465338L;
@Override
public SimpleString getDeadLetterAddress() {
return deadLetterAddress;
}
});
server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings().setDeadLetterAddress(deadLetterAddress));
Assert.assertEquals(deadLetterAddress.toString(), queueControl.getDeadLetterAddress());
session.deleteQueue(queue);

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.tests.integration.paging;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@ -42,7 +43,6 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -69,13 +70,18 @@ public class GlobalPagingTest extends PagingTest {
super.setUp();
}
boolean customServerCreated = true;
@Override
protected ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Map<String, AddressSettings> settings) {
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
protected void applySettings(ActiveMQServer server,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Integer maxReadPageMessages,
final Integer maxReadPageBytes,
final Map<String, AddressSettings> settings) {
super.applySettings(server, configuration, pageSize, maxAddressSize, maxReadPageMessages, maxReadPageBytes, settings);
customServerCreated = true;
if (settings != null) {
for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
@ -84,11 +90,9 @@ public class GlobalPagingTest extends PagingTest {
}
server.getConfiguration().setGlobalMaxSize(maxAddressSize);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxReadPageMessages(-1).setMaxReadPageBytes(-1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
return server;
}
// test doesn't make sense on GlobalPaging due to configuration issues
@ -98,13 +102,14 @@ public class GlobalPagingTest extends PagingTest {
@Test
public void testPagingOverFullDisk() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
Assume.assumeTrue(storeType != StoreConfiguration.StoreType.DATABASE);
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1, new HashMap<>());
Assert.assertTrue(customServerCreated);
server.getConfiguration().setGlobalMaxSize(-1);
server.getConfiguration().setAddressQueueScanPeriod(100);
@ -153,7 +158,7 @@ public class GlobalPagingTest extends PagingTest {
sendFewMessages(numberOfMessages, session, producer, body);
} catch (Exception e) {
errors.incrementAndGet();
e.printStackTrace();
e.printStackTrace(System.out);
}
}
};

View File

@ -79,12 +79,12 @@ public class IndividualAckPagingTest extends ActiveMQTestBase {
if (paging) {
server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.getAddressSettingsRepository().clear();
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
} else {
server = createServer(true, config, 10 * 1024 * 1024, -1);
server.getAddressSettingsRepository().clear();
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024 * 1024).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024 * 1024).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
}

View File

@ -79,7 +79,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, new HashMap<String, AddressSettings>());
server.start();
@ -393,7 +393,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, new HashMap<String, AddressSettings>());
server.start();
@ -478,7 +478,7 @@ public class PagingOrderTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1, new HashMap<String, AddressSettings>());
server.start();

View File

@ -40,7 +40,7 @@ public class PagingSizeWildcardTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
ActiveMQServer server = createServer(true, config, 200, 400);
ActiveMQServer server = createServer(true, config, 200, 400, -1, -1, null);
server.start();
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
@ -84,7 +84,7 @@ public class PagingSizeWildcardTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
ActiveMQServer server = createServer(true, config, 200, 400);
ActiveMQServer server = createServer(true, config, 200, 400, -1, -1, null);
server.start();
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));

View File

@ -218,7 +218,7 @@ public class PagingTest extends ActiveMQTestBase {
final int PAGE_SIZE = 10 * 1024;
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1);
server.start();
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
@ -513,7 +513,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -923,11 +923,12 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, 1, new HashMap<>());
// one message max, maxReadPageBytes disabled
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, 1, -1, new HashMap<>());
server.start();
final int numberOfMessages = 100;
final int numberOfMessages = 10;
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
@ -980,9 +981,9 @@ public class PagingTest extends ActiveMQTestBase {
Assert.assertNotNull(messReceived);
System.out.println("Receiving " + messReceived);
messReceived.acknowledge();
session.commit();
}
consumer.close();
session.commit();
Wait.assertFalse(queue.getPagingStore()::isPaging, 5000, 100);
Wait.assertEquals(1, () -> PagingStoreTestAccessor.getUsedPagesSize(queue.getPagingStore()), 1000, 100);
@ -1152,7 +1153,7 @@ public class PagingTest extends ActiveMQTestBase {
String address = "testSimpleResume";
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -1593,7 +1594,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -1994,9 +1995,9 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalDirectory(getJournalDir()).setJournalSyncNonTransactional(false).setJournalCompactMinFiles(0) // disable compact
.setMessageExpiryScanPeriod(10);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setExpiryAddress(new SimpleString("EXP")).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setExpiryAddress(new SimpleString("EXP")).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
server.getAddressSettingsRepository().clear();
@ -2245,7 +2246,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -2300,7 +2301,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
locator = createInVMNonHALocator();
@ -2496,7 +2497,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -2546,7 +2547,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
locator = createInVMNonHALocator();
@ -3228,7 +3229,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -3796,7 +3797,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -4018,7 +4019,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -4109,7 +4110,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalSyncTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -5471,7 +5472,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalFileSize(10 * 1024 * 1024);
server = createServer(true, config, 100 * 1024, 1024 * 1024 / 2);
server = createServer(true, config, 100 * 1024, 1024 * 1024 / 2, -1, -1);
server.start();
@ -5570,7 +5571,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -5652,7 +5653,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -5818,7 +5819,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -5922,7 +5923,7 @@ public class PagingTest extends ActiveMQTestBase {
AddressSettings dla = new AddressSettings().setMaxDeliveryAttempts(5).setDeadLetterAddress(new SimpleString("DLA")).setRedeliveryDelay(0);
settings.put(ADDRESS.toString(), dla);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1, settings);
server.start();
@ -6133,7 +6134,7 @@ public class PagingTest extends ActiveMQTestBase {
AddressSettings dla = new AddressSettings().setMaxDeliveryAttempts(5).setDeadLetterAddress(new SimpleString("DLA")).setExpiryAddress(new SimpleString("DLA"));
settings.put(ADDRESS.toString(), dla);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1, settings);
server.start();
@ -6852,7 +6853,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -7599,7 +7600,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
server.start();
@ -7796,14 +7797,16 @@ public class PagingTest extends ActiveMQTestBase {
session.close();
}
@Override
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
final long maxAddressSize) {
ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize);
protected void applySettings(ActiveMQServer server,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Integer maxReadPageMessages,
final Integer maxReadPageBytes,
final Map<String, AddressSettings> settings) {
server.getConfiguration().setAddressQueueScanPeriod(100);
return server;
}
@Override

View File

@ -49,7 +49,7 @@ public class SpawnedServerSupport {
}
static Configuration createConfig(String folder) {
AddressSettings settings = new AddressSettings().setMaxDeliveryAttempts(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(10 * 1024).setMaxSizeBytes(100 * 1024);
AddressSettings settings = new AddressSettings().setMaxDeliveryAttempts(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(10 * 1024).setMaxSizeBytes(100 * 1024).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
Configuration config = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(ActiveMQTestBase.getDefaultJournalType()).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(ActiveMQTestBase.CLUSTER_PASSWORD).setJournalDirectory(ActiveMQTestBase.getJournalDir(folder, 0, false)).setBindingsDirectory(ActiveMQTestBase.getBindingsDir(folder, 0, false)).setPagingDirectory(ActiveMQTestBase.getPageDir(folder, 0, false)).setLargeMessagesDirectory(ActiveMQTestBase.getLargeMessagesDir(folder, 0, false)).setPersistenceEnabled(true).addAddressSetting("#", settings).addAcceptorConfiguration(new TransportConfiguration(ActiveMQTestBase.NETTY_ACCEPTOR_FACTORY));

View File

@ -46,17 +46,11 @@ public class TestDeadlockOnPurgePagingTest extends ActiveMQTestBase {
protected ServerLocator locator;
protected ActiveMQServer server;
protected ClientSessionFactory sf;
static final int MESSAGE_SIZE = 1024; // 1k
static final int LARGE_MESSAGE_SIZE = 100 * 1024;
protected static final int RECEIVE_TIMEOUT = 5000;
protected static final int PAGE_MAX = 100 * 1024;
protected static final int PAGE_SIZE = 10 * 1024;
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
@Override
@Before
public void setUp() throws Exception {
@ -72,7 +66,7 @@ public class TestDeadlockOnPurgePagingTest extends ActiveMQTestBase {
Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, TestDeadlockOnPurgePagingTest.PAGE_SIZE, TestDeadlockOnPurgePagingTest.PAGE_MAX);
server = createServer(true, config, TestDeadlockOnPurgePagingTest.PAGE_SIZE, TestDeadlockOnPurgePagingTest.PAGE_MAX, -1, -1);
server.start();

View File

@ -722,7 +722,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
server = createServer(true);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.start();
@ -792,7 +792,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
server = createServer(true);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.start();
@ -855,7 +855,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
server = createServer(true);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.start();
@ -1146,7 +1146,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
//Create ANYCAST queue and set "AutoCreateDeadLetterResources"
//Send message with ANYCAST RoutingType
server.getAddressSettingsRepository().addMatch(myAddress.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true).setDeadLetterQueuePrefix(dlaPrefix));
server.getAddressSettingsRepository().addMatch(myAddress.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true).setDeadLetterQueuePrefix(dlaPrefix).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1));
session.createQueue(new QueueConfiguration(myQueue).setAddress(myAddress).setDurable(true).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session.createProducer(myAddress);

View File

@ -75,9 +75,13 @@ public class OpenwirePluginTest extends BasicOpenWireTest {
private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
@Override
protected ActiveMQServer createServer(boolean realFiles, Configuration configuration, int pageSize,
long maxAddressSize, Map<String, AddressSettings> settings) {
ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
protected void applySettings(ActiveMQServer server,
final Configuration configuration,
final int pageSize,
final long maxAddressSize,
final Integer maxReadPageMessages,
final Integer maxReadPageBytes,
final Map<String, AddressSettings> settings) {
server.registerBrokerPlugin(verifier);
server.registerBrokerPlugin(new ActiveMQServerPlugin() {
@ -93,10 +97,8 @@ public class OpenwirePluginTest extends BasicOpenWireTest {
}
});
configuration.getAddressSettings().put("autoCreated", new AddressSettings().setAutoDeleteAddresses(true)
.setAutoDeleteQueues(true).setAutoCreateQueues(true).setAutoCreateAddresses(true));
return server;
server.getAddressSettingsRepository().addMatch("autoCreated", new AddressSettings().setAutoDeleteAddresses(true)
.setAutoDeleteQueues(true).setAutoCreateQueues(true).setAutoCreateAddresses(true).setMaxReadPageMessages(-1).setMaxReadPageBytes(-1));
}
@Test(timeout = 10000)

View File

@ -27,6 +27,7 @@ import javax.jms.Topic;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@ -88,7 +89,7 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(true, createDefaultInVMConfig());
server = createServer(true, createDefaultInVMConfig(), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, -1, -1, new HashMap<>());
server.getConfiguration().setInternalNamingPrefix(ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX.replace('.', delimiterChar));
server.getConfiguration().getWildcardConfiguration().setDelimiter(delimiterChar);
server.start();

View File

@ -171,6 +171,8 @@ under the License.
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<max-read-page-messages>-1</max-read-page-messages>
<max-read-page-bytes>-1</max-read-page-bytes>
</address-setting>
</address-settings>

View File

@ -171,6 +171,8 @@ under the License.
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<max-read-page-messages>-1</max-read-page-messages>
<max-read-page-bytes>-1</max-read-page-bytes>
</address-setting>
</address-settings>

View File

@ -103,6 +103,9 @@ under the License.
<max-size-bytes>10MB</max-size-bytes>
<page-size-bytes>1MB</page-size-bytes>
<max-read-page-messages>-1</max-read-page-messages>
<max-read-page-bytes>-1</max-read-page-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>

View File

@ -104,6 +104,8 @@ under the License.
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>10MB</max-size-bytes>
<page-size-bytes>1MB</page-size-bytes>
<max-read-page-messages>-1</max-read-page-messages>
<max-read-page-bytes>-1</max-read-page-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>

View File

@ -107,6 +107,48 @@
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-flow-paging</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>admin</user>
<password>admin</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>false</noWeb>
<instance>${basedir}/target/flowControlPaging</instance>
<configuration>${basedir}/target/classes/servers/flowControlPaging</configuration>
<args>
<arg>--java-memory</arg>
<!-- some limited memory to make it more likely to fail -->
<arg>512M</arg>
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-subscription-paging</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>admin</user>
<password>admin</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>false</noWeb>
<instance>${basedir}/target/subscriptionPaging</instance>
<configuration>${basedir}/target/classes/servers/subscriptionPaging</configuration>
<args>
<arg>--java-memory</arg>
<!-- some limited memory to make it more likely to fail -->
<arg>512M</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>

View File

@ -0,0 +1,257 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<!-- if you want to retain your journal uncomment this following configuration.
This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.
it is recommended to use a separate storage unit from the journal for performance considerations.
<journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>
You can also enable retention by using the argument journal-retention on the `artemis create` command -->
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size> -->
<!-- the maximum number of messages accepted before entering full address mode.
if global-max-size is specified the full address mode will be specified by whatever hits it first. -->
<global-max-messages>-1</global-max-messages>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
default: 102400, -1 would mean to disable large mesasge control -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- if max-size-bytes and max-size-messages were both enabled, the system will enter into paging
based on the first attribute to hits the maximum value -->
<!-- limit for the address in bytes, -1 means unlimited -->
<max-size-bytes>0</max-size-bytes>
<!-- limit for the address in messages, -1 means unlimited -->
<max-size-messages>0</max-size-messages>
<!-- the size of each file on paging. Notice we keep files in memory while they are in use.
Lower this setting if you have too many queues in memory. -->
<!-- uncomment the next parameter if you want to customize how many messages are read from paging into queues.
-1 means do not set a limit by messages. -->
<!-- <max-read-page-messages>-1</max-read-page-messages> -->
<!-- uncomment the next parameter if you want to customize how much memory is used to keep messages from paging on the queues.
if you set it to -1 it means no limit is set. If both max-read-page-messages and max-read-page-bytes are -1, the system
will just allow infinite reading as long as consumers are issuing credits, independently of the ack state of these messages. -->
<!-- <max-read-page-bytes>1M</max-read-page-bytes> -->
<!-- The system will not read any more messages if messages are pending acks.
if you set this attribute to false, the broker will allow more reading as soon as messages become on "delivering" state. -->
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="true"/>
<property key="LOG_CONNECTION_EVENTS" value="true"/>
<property key="LOG_SESSION_EVENTS" value="true"/>
<property key="LOG_CONSUMER_EVENTS" value="true"/>
<property key="LOG_DELIVERING_EVENTS" value="true"/>
<property key="LOG_SENDING_EVENTS" value="true"/>
<property key="LOG_INTERNAL_EVENTS" value="true"/>
</broker-plugin>
</broker-plugins>
-->
</core>
</configuration>

View File

@ -221,17 +221,16 @@ under the License.
<!-- if max-size-bytes and max-size-messages were both enabled, the system will enter into paging
based on the first attribute to hits the maximum value -->
<!-- limit for the address in bytes, -1 means unlimited -->
<max-size-bytes>-1</max-size-bytes>
<max-size-bytes>0</max-size-bytes>
<!-- limit for the address in messages, -1 means unlimited -->
<max-size-messages>0</max-size-messages>
<!-- the size of each file on paging. Notice we keep files in memory while they are in use.
Lower this setting if you have too many queues in memory. -->
<page-size-bytes>5M</page-size-bytes>
<!-- how many messages are kept in memory from paging. The system will stop reading whenever this or max-read-page-bytes hits the max first. -->
<max-read-page-messages>50</max-read-page-messages>
<page-flow-control>true</page-flow-control>
<max-read-page-messages>-1</max-read-page-messages>
<!-- how many bytes equivalent of messages are kept in memory from paging (based on memory estimate). The system will stop reading whenever this or max-read-page-messages hits the max first. -->
<max-read-page-bytes>500K</max-read-page-bytes>
<max-read-page-bytes>1M</max-read-page-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>

View File

@ -0,0 +1,247 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<!-- if you want to retain your journal uncomment this following configuration.
This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.
it is recommended to use a separate storage unit from the journal for performance considerations.
<journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>
You can also enable retention by using the argument journal-retention on the `artemis create` command -->
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size> -->
<!-- the maximum number of messages accepted before entering full address mode.
if global-max-size is specified the full address mode will be specified by whatever hits it first. -->
<global-max-messages>-1</global-max-messages>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
default: 102400, -1 would mean to disable large mesasge control -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- if max-size-bytes and max-size-messages were both enabled, the system will enter into paging
based on the first attribute to hits the maximum value -->
<!-- limit for the address in bytes, -1 means unlimited -->
<max-size-bytes>0</max-size-bytes>
<!-- limit for the address in messages, -1 means unlimited -->
<max-size-messages>0</max-size-messages>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
<address name="SUB_TEST">
<multicast/>
</address>
</addresses>
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="true"/>
<property key="LOG_CONNECTION_EVENTS" value="true"/>
<property key="LOG_SESSION_EVENTS" value="true"/>
<property key="LOG_CONSUMER_EVENTS" value="true"/>
<property key="LOG_DELIVERING_EVENTS" value="true"/>
<property key="LOG_SENDING_EVENTS" value="true"/>
<property key="LOG_INTERNAL_EVENTS" value="true"/>
</broker-plugin>
</broker-plugins>
-->
</core>
</configuration>

View File

@ -36,16 +36,37 @@ import org.apache.activemq.artemis.cli.commands.Stop;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
public class SoakTestBase extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(SoakTestBase.class);
Set<Process> processes = new HashSet<>();
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_SERVER_PORT = 10099;
public static final String basedir = System.getProperty("basedir");
protected static void unzip(File zipFile, File serverFolder) throws IOException, ClassNotFoundException, InterruptedException {
ProcessBuilder zipBuilder = new ProcessBuilder("unzip", zipFile.getAbsolutePath()).directory(serverFolder);
Process process = zipBuilder.start();
SpawnedVMSupport.startLogger("zip", process);
logger.info("Zip finished with " + process.waitFor());
}
protected static void zip(File zipFile, File serverFolder) throws IOException, ClassNotFoundException, InterruptedException {
logger.info("Zipping data folder for " + zipFile);
ProcessBuilder zipBuilder = new ProcessBuilder("zip", "-r", zipFile.getAbsolutePath(), "data").directory(serverFolder);
Process process = zipBuilder.start();
SpawnedVMSupport.startLogger("zip", process);
logger.info("Zip finished with " + process.waitFor());
}
@After
public void after() throws Exception {
for (Process process : processes) {

View File

@ -26,7 +26,11 @@ public class TestParameters {
private static final Logger logger = Logger.getLogger(TestParameters.class);
private static String propertyName(String testName, String property) {
return "TEST_" + testName + "_" + property;
if (testName == null) {
return "TEST_" + property;
} else {
return "TEST_" + testName + "_" + property;
}
}
public static int testProperty(String testName, String property, int defaultValue) {

View File

@ -23,7 +23,7 @@ import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty
public class RandomFailoverSoakTest extends RandomReattachTest {
private static final String TEST_NAME = "RANDOM";
public static final int TEST_REPETITION = testProperty(TEST_NAME, "TEST_REPETITION", 100);
public static final int TEST_REPETITION = testProperty(TEST_NAME, "TEST_REPETITION", 10);
@Override
protected int getNumIterations() {

View File

@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty;
/**
* Refer to ./scripts/parameters-paging.sh for suggested parameters
* #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging
* export TEST_FLOW_ZIP_LOCATION=a folder */
@RunWith(Parameterized.class)
public class FlowControlPagingTest extends SoakTestBase {
private static final String TEST_NAME = "FLOW";
private final String protocol;
private static final String ZIP_LOCATION = testProperty(null, "ZIP_LOCATION", null);
private static final int SERVER_START_TIMEOUT = testProperty(TEST_NAME, "SERVER_START_TIMEOUT", 300_000);
private static final int TIMEOUT_MINUTES = testProperty(TEST_NAME, "TIMEOUT_MINUTES", 120);
private static final String PROTOCOL_LIST = testProperty(TEST_NAME, "PROTOCOL_LIST", "CORE"); // this test was about CORE mainly
private static final int PRINT_INTERVAL = testProperty(TEST_NAME, "PRINT_INTERVAL", 100);
private static final boolean TEST_ENABLED = Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
private final int MESSAGES;
private final int COMMIT_INTERVAL;
// if 0 will use AUTO_ACK
private final int RECEIVE_COMMIT_INTERVAL;
private final int MESSAGE_SIZE;
private static final Logger logger = Logger.getLogger(FlowControlPagingTest.class);
public static final String SERVER_NAME_0 = "flowControlPaging";
@Parameterized.Parameters(name = "protocol={0}")
public static Collection<Object[]> parameters() {
String[] protocols = PROTOCOL_LIST.split(",");
ArrayList<Object[]> parameters = new ArrayList<>();
for (String str : protocols) {
logger.info("Adding " + str + " to the list for the test");
parameters.add(new Object[]{str});
}
return parameters;
}
public FlowControlPagingTest(String protocol) {
this.protocol = protocol;
MESSAGES = testProperty(TEST_NAME, protocol + "_MESSAGES", 10000);
COMMIT_INTERVAL = testProperty(TEST_NAME, protocol + "_COMMIT_INTERVAL", 1000);
// if 0 will use AUTO_ACK
RECEIVE_COMMIT_INTERVAL = testProperty(TEST_NAME, protocol + "_RECEIVE_COMMIT_INTERVAL", 1);
MESSAGE_SIZE = testProperty(TEST_NAME, protocol + "_MESSAGE_SIZE", 30000);
}
Process serverProcess;
boolean unzipped = false;
private String getZipName() {
return "flow-data-" + protocol + "-" + MESSAGES + "-" + MESSAGE_SIZE + ".zip";
}
@Before
public void before() throws Exception {
Assume.assumeTrue(TEST_ENABLED);
cleanupData(SERVER_NAME_0);
boolean useZip = ZIP_LOCATION != null;
String zipName = getZipName();
File zipFile = useZip ? new File(ZIP_LOCATION + "/" + zipName) : null;
if (ZIP_LOCATION != null && zipFile.exists()) {
unzipped = true;
unzip(zipFile, new File(getServerLocation(SERVER_NAME_0)));
}
serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
}
@Test
public void testFlow() throws Exception {
String QUEUE_NAME = "QUEUE_FLOW";
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
AtomicInteger errors = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(1);
runAfter(service::shutdownNow);
if (!unzipped) {
Connection connection = factory.createConnection();
runAfter(connection::close);
String text;
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < MESSAGE_SIZE) {
buffer.append("a big string...");
}
text = buffer.toString();
}
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
logger.info("*******************************************************************************************************************************\ndestination " + queue.getQueueName());
MessageProducer producer = session.createProducer(queue);
for (int m = 0; m < MESSAGES; m++) {
TextMessage message = session.createTextMessage(text);
message.setIntProperty("m", m);
producer.send(message);
if (m > 0 && m % COMMIT_INTERVAL == 0) {
logger.info("Sent " + m + " " + protocol + " messages on queue " + queue.getQueueName());
session.commit();
}
}
session.commit();
session.close();
connection.close();
killServer(serverProcess);
}
if (ZIP_LOCATION != null && !unzipped) {
String fileName = getZipName();
zip(new File(ZIP_LOCATION, fileName), new File(getServerLocation(SERVER_NAME_0)));
}
serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
final ConnectionFactory factoryClient;
if (protocol.equals("CORE")) {
factoryClient = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616?consumerWindowSize=-1");
} else if (protocol.equals("AMQP")) {
factoryClient = CFUtil.createConnectionFactory("AMQP", "amqp://localhost:61616?jms.prefetchPolicy.queuePrefetch=100000");
} else if (protocol.equals("OPENWIRE")) {
factoryClient = CFUtil.createConnectionFactory("OPENWIRE", "tcp://localhost:61616"); // no flow control on openwire by default
} else {
factoryClient = CFUtil.createConnectionFactory("OPENWIRE", "tcp://localhost:61616"); // no flow control on openwire by default
}
Connection connectionConsumer = factoryClient.createConnection();
runAfter(connectionConsumer::close);
AtomicInteger completedFine = new AtomicInteger(0);
service.execute(() -> {
try {
Session sessionConsumer;
if (RECEIVE_COMMIT_INTERVAL <= 0) {
sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
} else {
sessionConsumer = connectionConsumer.createSession(true, Session.SESSION_TRANSACTED);
}
MessageConsumer messageConsumer = sessionConsumer.createConsumer(sessionConsumer.createQueue(QUEUE_NAME));
for (int m = 0; m < MESSAGES; m++) {
TextMessage message = (TextMessage) messageConsumer.receive(5_000);
if (message == null) {
sessionConsumer.commit();
System.out.println("TX was flow controlled, no message received, consider lowering your TX interval, m = " + m);
m--;
continue;
}
Assert.assertEquals(m, message.getIntProperty("m"));
// The sending commit interval here will be used for printing
if (PRINT_INTERVAL > 0 && m % PRINT_INTERVAL == 0) {
logger.info("Destination " + QUEUE_NAME + " received " + m + " " + protocol + " messages");
}
if (RECEIVE_COMMIT_INTERVAL > 0 && (m + 1) % RECEIVE_COMMIT_INTERVAL == 0) {
sessionConsumer.commit();
}
}
if (RECEIVE_COMMIT_INTERVAL > 0) {
sessionConsumer.commit();
}
completedFine.incrementAndGet();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
}
});
connectionConsumer.start();
service.shutdown();
Assert.assertTrue("Test Timed Out", service.awaitTermination(TIMEOUT_MINUTES, TimeUnit.MINUTES));
Assert.assertEquals(0, errors.get());
connectionConsumer.close();
}
}

View File

@ -35,9 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -45,13 +45,8 @@ import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty;
/** It is recommended to set the following System properties before running this test:
*
* export TEST_HORIZONTAL_DESTINATIONS=500
* export TEST_HORIZONTAL_MESSAGES=500
* export TEST_HORIZONTAL_COMMIT_INTERVAL=100
* export TEST_HORIZONTAL_SIZE=60000
*
/**
* Refer to ./scripts/parameters-paging.sh for suggested parameters
* #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging
* export TEST_HORIZONTAL_ZIP_LOCATION=a folder
* */
@ -61,7 +56,8 @@ public class HorizontalPagingTest extends SoakTestBase {
private static final String TEST_NAME = "HORIZONTAL";
private final String protocol;
private static final String ZIP_LOCATION = testProperty(TEST_NAME, "ZIP_LOCATION", null);
private static final boolean TEST_ENABLED = Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
private static final String ZIP_LOCATION = testProperty(null, "ZIP_LOCATION", null);
private static final int SERVER_START_TIMEOUT = testProperty(TEST_NAME, "SERVER_START_TIMEOUT", 300_000);
private static final int TIMEOUT_MINUTES = testProperty(TEST_NAME, "TIMEOUT_MINUTES", 120);
private static final String PROTOCOL_LIST = testProperty(TEST_NAME, "PROTOCOL_LIST", "OPENWIRE,CORE,AMQP");
@ -109,11 +105,12 @@ public class HorizontalPagingTest extends SoakTestBase {
boolean unzipped = false;
private String getZipName() {
return "data-" + protocol + "-" + DESTINATIONS + "-" + MESSAGES + "-" + MESSAGE_SIZE + ".zip";
return "horizontal-" + protocol + "-" + DESTINATIONS + "-" + MESSAGES + "-" + MESSAGE_SIZE + ".zip";
}
@Before
public void before() throws Exception {
Assume.assumeTrue(TEST_ENABLED);
cleanupData(SERVER_NAME_0);
boolean useZip = ZIP_LOCATION != null;
@ -122,12 +119,7 @@ public class HorizontalPagingTest extends SoakTestBase {
if (ZIP_LOCATION != null && zipFile.exists()) {
unzipped = true;
System.out.println("Invoking unzip");
ProcessBuilder zipBuilder = new ProcessBuilder("unzip", zipFile.getAbsolutePath()).directory(new File(getServerLocation(SERVER_NAME_0)));
Process process = zipBuilder.start();
SpawnedVMSupport.startLogger("zip", process);
System.out.println("Zip finished with " + process.waitFor());
unzip(zipFile, new File(getServerLocation(SERVER_NAME_0)));
}
serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
@ -168,7 +160,9 @@ public class HorizontalPagingTest extends SoakTestBase {
logger.info("*******************************************************************************************************************************\ndestination " + queue.getQueueName());
MessageProducer producer = session.createProducer(queue);
for (int m = 0; m < MESSAGES; m++) {
producer.send(session.createTextMessage(text));
TextMessage message = session.createTextMessage(text);
message.setIntProperty("m", m);
producer.send(message);
if (m > 0 && m % COMMIT_INTERVAL == 0) {
logger.info("Sent " + m + " " + protocol + " messages on queue " + queue.getQueueName());
session.commit();
@ -198,13 +192,10 @@ public class HorizontalPagingTest extends SoakTestBase {
}
if (ZIP_LOCATION != null && !unzipped) {
String fileName = getZipName();
logger.info("Zipping data folder for " + protocol + " as " + fileName);
ProcessBuilder zipBuilder = new ProcessBuilder("zip", "-r", ZIP_LOCATION + "/" + getZipName(), "data").directory(new File(getServerLocation(SERVER_NAME_0)));
Process process = zipBuilder.start();
SpawnedVMSupport.startLogger("zip", process);
System.out.println("Zip finished with " + process.waitFor());
zip(new File(ZIP_LOCATION, fileName), new File(getServerLocation(SERVER_NAME_0)));
}
serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
@ -240,6 +231,8 @@ public class HorizontalPagingTest extends SoakTestBase {
logger.info("Destination " + destination + " received " + m + " " + protocol + " messages");
}
Assert.assertEquals(m, message.getIntProperty("m"));
if (RECEIVE_COMMIT_INTERVAL > 0 && (m + 1) % RECEIVE_COMMIT_INTERVAL == 0) {
sessionConsumer.commit();
}

View File

@ -0,0 +1,295 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty;
/**
* Refer to ./scripts/parameters-paging.sh for suggested parameters
* #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging
* export TEST_FLOW_ZIP_LOCATION=a folder */
@RunWith(Parameterized.class)
public class SubscriptionPagingTest extends SoakTestBase {
private static final String TEST_NAME = "SUBSCRIPTION";
private final String protocol;
private static final String ZIP_LOCATION = testProperty(null, "ZIP_LOCATION", null);
private static final int SERVER_START_TIMEOUT = testProperty(TEST_NAME, "SERVER_START_TIMEOUT", 300_000);
private static final int TIMEOUT_MINUTES = testProperty(TEST_NAME, "TIMEOUT_MINUTES", 120);
private static final String PROTOCOL_LIST = testProperty(TEST_NAME, "PROTOCOL_LIST", "CORE");
private static final int PRINT_INTERVAL = testProperty(TEST_NAME, "PRINT_INTERVAL", 100);
private static final boolean TEST_ENABLED = Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
private final int MESSAGES;
private final int COMMIT_INTERVAL;
// if 0 will use AUTO_ACK
private final int RECEIVE_COMMIT_INTERVAL;
private final int MESSAGE_SIZE;
private final int SLOW_SUBSCRIPTIONS;
private final int SLEEP_SLOW;
final String TOPIC_NAME = "SUB_TEST";
private static final Logger logger = Logger.getLogger(SubscriptionPagingTest.class);
public static final String SERVER_NAME_0 = "subscriptionPaging";
@Parameterized.Parameters(name = "protocol={0}")
public static Collection<Object[]> parameters() {
String[] protocols = PROTOCOL_LIST.split(",");
ArrayList<Object[]> parameters = new ArrayList<>();
for (String str : protocols) {
logger.info("Adding " + str + " to the list for the test");
parameters.add(new Object[]{str});
}
return parameters;
}
public SubscriptionPagingTest(String protocol) {
this.protocol = protocol;
MESSAGES = testProperty(TEST_NAME, protocol + "_MESSAGES", 10000);
COMMIT_INTERVAL = testProperty(TEST_NAME, protocol + "_COMMIT_INTERVAL", 1000);
// if 0 will use AUTO_ACK
RECEIVE_COMMIT_INTERVAL = testProperty(TEST_NAME, protocol + "_RECEIVE_COMMIT_INTERVAL", 0);
MESSAGE_SIZE = testProperty(TEST_NAME, protocol + "_MESSAGE_SIZE", 30000);
SLOW_SUBSCRIPTIONS = testProperty(TEST_NAME, "SLOW_SUBSCRIPTIONS", 1);
SLEEP_SLOW = testProperty(TEST_NAME, "SLEEP_SLOW", 1000);
}
Process serverProcess;
boolean unzipped = false;
private String getZipName() {
return "subscription-" + protocol + "-" + MESSAGES + "-" + MESSAGE_SIZE + "-" + SLOW_SUBSCRIPTIONS + ".zip";
}
@Before
public void before() throws Exception {
Assume.assumeTrue(TEST_ENABLED);
cleanupData(SERVER_NAME_0);
boolean useZip = ZIP_LOCATION != null;
String zipName = getZipName();
File zipFile = useZip ? new File(ZIP_LOCATION + "/" + zipName) : null;
if (ZIP_LOCATION != null && zipFile.exists()) {
unzipped = true;
unzip(zipFile, new File(getServerLocation(SERVER_NAME_0)));
}
serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
}
private void receive(ConnectionFactory factory, String clientID, String name, int txInterval, AtomicInteger errors, AtomicInteger sleepTime, int numberOfMessages, Runnable callbackDone) {
try {
Connection connection = factory.createConnection();
try {
connection.setClientID(clientID);
connection.start();
Session session;
if (txInterval == 0) {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} else {
session = connection.createSession(true, Session.SESSION_TRANSACTED);
}
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, name);
int commitPending = 0;
for (int i = 0; i < numberOfMessages; i++) {
Message message = subscriber.receive(5_000);
if (message == null) {
logger.info("Receiver " + clientID + " subscription " + name + " did not receibe a message");
i--;
continue;
}
if (i % PRINT_INTERVAL == 0) {
logger.info("Received " + i + " on " + clientID + "_" + name);
}
Assert.assertEquals(i, message.getIntProperty("m"));
if (txInterval > 0) {
commitPending++;
if (commitPending >= txInterval) {
session.commit();
commitPending = 0;
}
}
int timeout = sleepTime.get();
if (timeout > 0) {
logger.info("Sleeping for " + timeout + " on " + clientID + "_" + name);
Thread.sleep(timeout);
}
}
} finally {
connection.close();
if (callbackDone != null) {
callbackDone.run();
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
}
}
@Test
public void testSubscription() throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
AtomicInteger errors = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(SLOW_SUBSCRIPTIONS + 1);
runAfter(service::shutdownNow);
final ConnectionFactory factoryClient;
if (protocol.equals("CORE")) {
factoryClient = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616?consumerWindowSize=-1");
} else if (protocol.equals("AMQP")) {
factoryClient = CFUtil.createConnectionFactory("AMQP", "amqp://localhost:61616?jms.prefetchPolicy.queuePrefetch=-1");
} else if (protocol.equals("OPENWIRE")) {
factoryClient = CFUtil.createConnectionFactory("OPENWIRE", "tcp://localhost:61616"); // no flow control on openwire by default
} else {
factoryClient = CFUtil.createConnectionFactory("OPENWIRE", "tcp://localhost:61616"); // no flow control on openwire by default
}
{
// just creating the subscriptions before we actually send messages
CountDownLatch countDownLatch = new CountDownLatch(SLOW_SUBSCRIPTIONS + 1);
service.execute(() -> receive(factoryClient, "fast", "fast", 0, errors, new AtomicInteger(0), 0, countDownLatch::countDown));
for (int i = 0; i < SLOW_SUBSCRIPTIONS; i++) {
final int finalI = i;
service.execute(() -> receive(factoryClient, "slow_" + finalI, "slow_" + finalI, 0, errors, new AtomicInteger(0), 0, countDownLatch::countDown));
}
Assert.assertTrue(countDownLatch.await(1, TimeUnit.MINUTES));
Assert.assertEquals(0, errors.get());
}
if (!unzipped) {
Connection connection = factory.createConnection();
runAfter(connection::close);
String text;
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < MESSAGE_SIZE) {
buffer.append("a big string...");
}
text = buffer.toString();
}
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(destination);
for (int m = 0; m < MESSAGES; m++) {
TextMessage message = session.createTextMessage(text);
message.setIntProperty("m", m);
producer.send(message);
if (m > 0 && m % COMMIT_INTERVAL == 0) {
logger.info("Sent " + m + " " + protocol + " messages on queue " + destination);
session.commit();
}
}
session.commit();
session.close();
connection.close();
killServer(serverProcess);
}
if (ZIP_LOCATION != null && !unzipped) {
String fileName = getZipName();
zip(new File(ZIP_LOCATION, fileName), new File(getServerLocation(SERVER_NAME_0)));
}
serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
// just creating the subscriptions before we actually send messages
CountDownLatch countDownLatch = new CountDownLatch(SLOW_SUBSCRIPTIONS + 1);
AtomicInteger sleepInterval = new AtomicInteger(SLEEP_SLOW);
service.execute(() -> receive(factoryClient, "fast", "fast", RECEIVE_COMMIT_INTERVAL, errors, new AtomicInteger(0), MESSAGES, () -> {
// after the fast consumer is done, the slow consumers will become fast
sleepInterval.set(0);
countDownLatch.countDown();
}));
for (int i = 0; i < SLOW_SUBSCRIPTIONS; i++) {
final int finalI = i;
service.execute(() -> receive(factoryClient, "slow_" + finalI, "slow_" + finalI, RECEIVE_COMMIT_INTERVAL, errors, sleepInterval, MESSAGES, countDownLatch::countDown));
}
Assert.assertTrue(countDownLatch.await(TIMEOUT_MINUTES, TimeUnit.MINUTES));
Assert.assertEquals(0, errors.get());
service.shutdown();
Assert.assertTrue("Test Timed Out", service.awaitTermination(1, TimeUnit.MINUTES));
Assert.assertEquals(0, errors.get());
}
}

View File

@ -1,47 +0,0 @@
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# this script contains a suggest set of variables to run the HorizontalPagingTest in a medium environment and hit some issues we used to have with paging
# It is possible to save the producer's time. If you set this variable the test will reuse previously sent data by zip and unzipping the data folder
export TEST_HORIZONTAL_ZIP_LOCATION=/tmp
export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000
export TEST_HORIZONTAL_TIMEOUT_MINUTES=120
export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP
export TEST_HORIZONTAL_CORE_DESTINATIONS=2
export TEST_HORIZONTAL_CORE_MESSAGES=1000
export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=100
export TEST_HORIZONTAL_CORE_RECEIVE_COMMIT_INTERVAL=0
export TEST_HORIZONTAL_CORE_MESSAGE_SIZE=20000
export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=10
export TEST_HORIZONTAL_AMQP_DESTINATIONS=2
export TEST_HORIZONTAL_AMQP_MESSAGES=1000
export TEST_HORIZONTAL_AMQP_COMMIT_INTERVAL=100
export TEST_HORIZONTAL_AMQP_RECEIVE_COMMIT_INTERVAL=0
export TEST_HORIZONTAL_AMQP_MESSAGE_SIZE=20000
export TEST_HORIZONTAL_AMQP_PARALLEL_SENDS=10
export TEST_HORIZONTAL_OPENWIRE_DESTINATIONS=2
export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000
export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100
export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0
export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000
export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10

View File

@ -18,9 +18,13 @@
# this script contains a suggest set of variables to run the HorizontalPagingTest in a medium environment and hit some issues we used to have with paging
## Generic variable:
# It is possible to save the producer's time. If you set this variable the test will reuse previously sent data by zip and unzipping the data folder
#export TEST_HORIZONTAL_ZIP_LOCATION=/place/to/my/zip
#export TEST_ZIP_LOCATION=~/zipTest/
#HorizontalPagingTest
export TEST_HORIZONTAL_TEST_ENABLED=true
export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000
export TEST_HORIZONTAL_TIMEOUT_MINUTES=120
export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP
@ -44,4 +48,43 @@ export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000
export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100
export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0
export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000
export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10
export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10
export TEST_FLOW_SERVER_START_TIMEOUT=300000
export TEST_FLOW_TIMEOUT_MINUTES=120
# FlowControlPagingTest
export TEST_FLOW_PROTOCOL_LIST=CORE,AMQP,OPENWIRE
export TEST_FLOW_PRINT_INTERVAL=100
export TEST_FLOW_OPENWIRE_MESSAGES=10000
export TEST_FLOW_OPENWIRE_COMMIT_INTERVAL=1000
export TEST_FLOW_OPENWIRE_RECEIVE_COMMIT_INTERVAL=10
export TEST_FLOW_OPENWIRE_MESSAGE_SIZE=60000
export TEST_FLOW_CORE_MESSAGES=10000
export TEST_FLOW_CORE_COMMIT_INTERVAL=1000
export TEST_FLOW_CORE_RECEIVE_COMMIT_INTERVAL=10
export TEST_FLOW_CORE_MESSAGE_SIZE=30000
export TEST_FLOW_AMQP_MESSAGES=10000
export TEST_FLOW_AMQP_COMMIT_INTERVAL=1000
export TEST_FLOW_AMQP_RECEIVE_COMMIT_INTERVAL=10
export TEST_FLOW_AMQP_MESSAGE_SIZE=30000
# SubscriptionPagingTest
export TEST_SUBSCRIPTION_PROTOCOL_LIST=CORE
export TEST_SUBSCRIPTION_SERVER_START_TIMEOUT=300000
export TEST_SUBSCRIPTION_TIMEOUT_MINUTES=120
export TEST_SUBSCRIPTION_PRINT_INTERVAL=100
export TEST_SUBSCRIPTION_SLOW_SUBSCRIPTIONS=5
export TEST_SUBSCRIPTION_CORE_MESSAGES=10000
export TEST_SUBSCRIPTION_CORE_COMMIT_INTERVAL=1000
export TEST_SUBSCRIPTION_CORE_RECEIVE_COMMIT_INTERVAL=0
export TEST_SUBSCRIPTION_CORE_MESSAGE_SIZE=30000
export TEST_SUBSCRIPTION_SLEEP_SLOW=1000