This commit is contained in:
Clebert Suconic 2019-08-25 23:41:03 -04:00
commit a848572292
14 changed files with 238 additions and 13 deletions

View File

@ -98,6 +98,18 @@ public final class Validators {
}
};
public static final Validator LE_ONE = new Validator() {
@Override
public void validate(final String name, final Object value) {
Number val = (Number) value;
if (val.doubleValue() <= 1) {
// OK
} else {
throw ActiveMQMessageBundle.BUNDLE.lessThanOrEqualToOne(name, val);
}
}
};
public static final Validator MINUS_ONE_OR_GT_ZERO = new Validator() {
@Override
public void validate(final String name, final Object value) {

View File

@ -166,6 +166,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String REDELIVERY_DELAY_MULTIPLIER_NODE_NAME = "redelivery-delay-multiplier";
private static final String REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME = "redelivery-collision-avoidance-factor";
private static final String MAX_REDELIVERY_DELAY_NODE_NAME = "max-redelivery-delay";
private static final String MAX_DELIVERY_ATTEMPTS = "max-delivery-attempts";
@ -1046,6 +1048,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setRedeliveryDelay(XMLUtil.parseLong(child));
} else if (REDELIVERY_DELAY_MULTIPLIER_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setRedeliveryMultiplier(XMLUtil.parseDouble(child));
} else if (REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME.equalsIgnoreCase(name)) {
double redeliveryCollisionAvoidanceFactor = XMLUtil.parseDouble(child);
Validators.GE_ZERO.validate(REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME, redeliveryCollisionAvoidanceFactor);
Validators.LE_ONE.validate(REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME, redeliveryCollisionAvoidanceFactor);
addressSettings.setRedeliveryCollisionAvoidanceFactor(redeliveryCollisionAvoidanceFactor);
} else if (MAX_REDELIVERY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxRedeliveryDelay(XMLUtil.parseLong(child));
} else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {

View File

@ -479,4 +479,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229227, value = "{0} must be equals to -1 or greater than 0 and less than or equal to Integer.MAX_VALUE (actual value: {1})", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException inRangeOfPositiveIntThanMinusOne(String name, Number val);
@Message(id = 229228, value = "{0} must be less than or equal to 1 (actual value: {1})", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException lessThanOrEqualToOne(String name, Number val);
}

View File

@ -28,10 +28,12 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@ -3617,9 +3619,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
long redeliveryDelay = addressSettings.getRedeliveryDelay();
long maxRedeliveryDelay = addressSettings.getMaxRedeliveryDelay();
double redeliveryMultiplier = addressSettings.getRedeliveryMultiplier();
double collisionAvoidanceFactor = addressSettings.getRedeliveryCollisionAvoidanceFactor();
int tmpDeliveryCount = deliveryCount > 0 ? deliveryCount - 1 : 0;
long delay = (long) (redeliveryDelay * (Math.pow(redeliveryMultiplier, tmpDeliveryCount)));
if (collisionAvoidanceFactor > 0) {
Random random = ThreadLocalRandom.current();
double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
delay += (delay * variance);
}
if (delay > maxRedeliveryDelay) {
delay = maxRedeliveryDelay;

View File

@ -54,6 +54,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final double DEFAULT_REDELIVER_MULTIPLIER = 1.0;
public static final double DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR = 0.0;
public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;
@Deprecated
@ -125,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Double redeliveryMultiplier = null;
private Double redeliveryCollisionAvoidanceFactor = null;
private Long maxRedeliveryDelay = null;
private SimpleString deadLetterAddress = null;
@ -223,6 +227,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.messageCounterHistoryDayLimit = other.messageCounterHistoryDayLimit;
this.redeliveryDelay = other.redeliveryDelay;
this.redeliveryMultiplier = other.redeliveryMultiplier;
this.redeliveryCollisionAvoidanceFactor = other.redeliveryCollisionAvoidanceFactor;
this.maxRedeliveryDelay = other.maxRedeliveryDelay;
this.deadLetterAddress = other.deadLetterAddress;
this.expiryAddress = other.expiryAddress;
@ -566,6 +571,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public double getRedeliveryCollisionAvoidanceFactor() {
return redeliveryCollisionAvoidanceFactor != null ? redeliveryCollisionAvoidanceFactor : AddressSettings.DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR;
}
public AddressSettings setRedeliveryCollisionAvoidanceFactor(final double redeliveryCollisionAvoidanceFactor) {
this.redeliveryCollisionAvoidanceFactor = redeliveryCollisionAvoidanceFactor;
return this;
}
public long getMaxRedeliveryDelay() {
// default is redelivery-delay * 10 as specified on the docs and at this JIRA:
// https://issues.jboss.org/browse/HORNETQ-1263
@ -776,6 +790,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (redeliveryMultiplier == null) {
redeliveryMultiplier = merged.redeliveryMultiplier;
}
if (redeliveryCollisionAvoidanceFactor == null) {
redeliveryCollisionAvoidanceFactor = merged.redeliveryCollisionAvoidanceFactor;
}
if (maxRedeliveryDelay == null) {
maxRedeliveryDelay = merged.maxRedeliveryDelay;
}
@ -1059,6 +1076,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
defaultRingSize = BufferHelper.readNullableLong(buffer);
}
if (buffer.readableBytes() > 0) {
redeliveryCollisionAvoidanceFactor = BufferHelper.readNullableDouble(buffer);
}
}
@Override
@ -1073,6 +1094,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableInteger(messageCounterHistoryDayLimit) +
BufferHelper.sizeOfNullableLong(redeliveryDelay) +
BufferHelper.sizeOfNullableDouble(redeliveryMultiplier) +
BufferHelper.sizeOfNullableDouble(redeliveryCollisionAvoidanceFactor) +
BufferHelper.sizeOfNullableLong(maxRedeliveryDelay) +
SimpleString.sizeofNullableString(deadLetterAddress) +
SimpleString.sizeofNullableString(expiryAddress) +
@ -1210,6 +1232,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableLong(buffer, defaultRingSize);
BufferHelper.writeNullableDouble(buffer, redeliveryCollisionAvoidanceFactor);
}
/* (non-Javadoc)
@ -1235,6 +1259,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((pageMaxCache == null) ? 0 : pageMaxCache.hashCode());
result = prime * result + ((redeliveryDelay == null) ? 0 : redeliveryDelay.hashCode());
result = prime * result + ((redeliveryMultiplier == null) ? 0 : redeliveryMultiplier.hashCode());
result = prime * result + ((redeliveryCollisionAvoidanceFactor == null) ? 0 : redeliveryCollisionAvoidanceFactor.hashCode());
result = prime * result + ((maxRedeliveryDelay == null) ? 0 : maxRedeliveryDelay.hashCode());
result = prime * result + ((redistributionDelay == null) ? 0 : redistributionDelay.hashCode());
result = prime * result + ((sendToDLAOnNoRoute == null) ? 0 : sendToDLAOnNoRoute.hashCode());
@ -1363,6 +1388,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!redeliveryMultiplier.equals(other.redeliveryMultiplier))
return false;
if (redeliveryCollisionAvoidanceFactor == null) {
if (other.redeliveryCollisionAvoidanceFactor != null)
return false;
} else if (!redeliveryCollisionAvoidanceFactor.equals(other.redeliveryCollisionAvoidanceFactor))
return false;
if (maxRedeliveryDelay == null) {
if (other.maxRedeliveryDelay != null)
return false;
@ -1580,6 +1610,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
redeliveryDelay +
", redeliveryMultiplier=" +
redeliveryMultiplier +
", redeliveryCollisionAvoidanceFactor=" +
redeliveryCollisionAvoidanceFactor +
", maxRedeliveryDelay=" +
maxRedeliveryDelay +
", redistributionDelay=" +

View File

@ -2991,6 +2991,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="redelivery-collision-avoidance-factor" type="xsd:double" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
factor by which to modify the redelivery delay slightly to avoid collisions
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="max-redelivery-delay" type="xsd:long" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -343,6 +343,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("a1.1", conf.getAddressesSettings().get("a1").getDeadLetterAddress().toString());
assertEquals("a1.2", conf.getAddressesSettings().get("a1").getExpiryAddress().toString());
assertEquals(1, conf.getAddressesSettings().get("a1").getRedeliveryDelay());
assertEquals(0.5, conf.getAddressesSettings().get("a1").getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(856686592L, conf.getAddressesSettings().get("a1").getMaxSizeBytes());
assertEquals(817381738L, conf.getAddressesSettings().get("a1").getPageSizeBytes());
assertEquals(10, conf.getAddressesSettings().get("a1").getPageCacheMaxSize());
@ -365,6 +366,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
assertEquals(5, conf.getAddressesSettings().get("a2").getRedeliveryDelay());
assertEquals(0.0, conf.getAddressesSettings().get("a2").getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(932489234928324L, conf.getAddressesSettings().get("a2").getMaxSizeBytes());
assertEquals(712671626L, conf.getAddressesSettings().get("a2").getPageSizeBytes());
assertEquals(20, conf.getAddressesSettings().get("a2").getPageCacheMaxSize());

View File

@ -383,6 +383,7 @@
<dead-letter-address>a1.1</dead-letter-address>
<expiry-address>a1.2</expiry-address>
<redelivery-delay>1</redelivery-delay>
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>
<page-size-bytes>817381738</page-size-bytes>
<page-max-cache-size>10</page-max-cache-size>

View File

@ -19,6 +19,7 @@
<dead-letter-address>a1.1</dead-letter-address>
<expiry-address>a1.2</expiry-address>
<redelivery-delay>1</redelivery-delay>
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>
<page-size-bytes>817381738</page-size-bytes>
<page-max-cache-size>10</page-max-cache-size>

View File

@ -2957,6 +2957,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="redelivery-collision-avoidance-factor" type="xsd:double" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
factor by which to modify the redelivery delay slightly to avoid collisions
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="max-redelivery-delay" type="xsd:long" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -573,6 +573,7 @@ that would be found in the `broker.xml` file.
<expiry-delay>123</expiry-delay>
<redelivery-delay>5000</redelivery-delay>
<redelivery-delay-multiplier>1.0</redelivery-delay-multiplier>
<redelivery-collision-avoidance-factor>0.0</redelivery-collision-avoidance-factor>
<max-redelivery-delay>10000</max-redelivery-delay>
<max-delivery-attempts>3</max-delivery-attempts>
<max-size-bytes>100000</max-size-bytes>
@ -660,6 +661,11 @@ messages](undelivered-messages.md#configuring-delayed-redelivery).
Default is `1.0`. Read more about [undelivered
messages](undelivered-messages.md#configuring-delayed-redelivery).
`redelivery-collision-avoidance-factor` defines an additional factor used to
calculate an adjustment to the `redelivery-delay` (up or down). Default is
`0.0`. Valid values are between 0.0 and 1.0. Read more about [undelivered
messages](undelivered-messages.md#configuring-delayed-redelivery).
`max-size-bytes`, `page-size-bytes`, & `page-max-cache-size` are used to
configure paging on an address. This is explained
[here](paging.md#configuration).

View File

@ -204,10 +204,11 @@ Name | Description | Default
[match](address-model.md) | The filter to apply to the setting | n/a
[dead-letter-address](undelivered-messages.md) | Dead letter address | n/a
[expiry-address](message-expiry.md) | Expired messages address | n/a
[expiry-delay](address-model.md) | Expiration time override; -1 don't override | -1
[expiry-delay](message-expiry.md) | Expiration time override; -1 don't override | -1
[redelivery-delay](undelivered-messages.md) | Time to wait before redelivering a message | 0
[redelivery-delay-multiplier](address-model.md) | Multiplier to apply to the `redelivery-delay` | 1.0
[max-redelivery-delay](address-model.md) | Max value for the `redelivery-delay` | 10 \* `redelivery-delay`
[redelivery-delay-multiplier](undelivered-messages.md) | Multiplier to apply to the `redelivery-delay` | 1.0
[redelivery-collision-avoidance-factor](undelivered-messages.md) | an additional factor used to calculate an adjustment to the `redelivery-delay` (up or down) | 0.0
[max-redelivery-delay](undelivered-messages.md) | Max value for the `redelivery-delay` | 10 \* `redelivery-delay`
[max-delivery-attempts](undelivered-messages.md)| Number of retries before dead letter address| 10
[max-size-bytes](paging.md)| Max size a queue can be before invoking `address-full-policy` | -1
[max-size-bytes-reject-threshold]() | Used with `BLOCK`, the max size an address can reach before messages are rejected; works in combination with `max-size-bytes` **for AMQP clients only**. | -1

View File

@ -42,6 +42,8 @@ Delayed redelivery is defined in the address-setting configuration:
<redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>
<!-- default is 0 (no delay) -->
<redelivery-delay>5000</redelivery-delay>
<!-- default is 0.0) -->
<redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
<!-- default is redelivery-delay * 10 -->
<max-redelivery-delay>50000</max-redelivery-delay>
</address-setting>
@ -59,24 +61,60 @@ message will be sent asynchronously back to the queue after the delay.
You can specify a multiplier (the `redelivery-delay-multiplier`) that will
take effect on top of the `redelivery-delay`. Each time a message is redelivered
the delay period will be equal to the previous delay * `redelivery-delay-multiplier`.
A max-redelivery-delay can be set to prevent the delay from becoming too large.
The max-redelivery-delay is defaulted to redelivery-delay \* 10.
A `max-redelivery-delay` can be set to prevent the delay from becoming too large.
The `max-redelivery-delay` is defaulted to `redelivery-delay` \* 10.
Example:
**Example:**
- redelivery-delay=5000, redelivery-delay-multiplier=2, max-redelivery-delay=15000
- redelivery-delay=5000, redelivery-delay-multiplier=2, max-redelivery-delay=15000,
redelivery-collision-avoidance-factor=0.0
1. Delivery Attempt 1. (Unsuccessful)
2. Wait Delay Period: 5000
3. Delivery Attempt 2. (Unsuccessful)
4. Wait Delay Period: 10000 // (5000 * 2) < max-delay-period. Use 10000
5. Delivery Attempt 3: (Unsuccessful)
6. Wait Delay Period: 15000 // (10000 * 2) > max-delay-period: Use max-delay-delivery
1. Delivery Attempt 1. (Unsuccessful)
2. Wait Delay Period: 5000
3. Delivery Attempt 2. (Unsuccessful)
4. Wait Delay Period: 10000 // (5000 * 2) < max-delay-period. Use 10000
5. Delivery Attempt 3: (Unsuccessful)
6. Wait Delay Period: 15000 // (10000 * 2) > max-delay-period: Use max-delay-delivery
Address wildcards can be used to configure redelivery delay for a set of
addresses (see [Understanding the Wildcard Syntax](wildcard-syntax.md)), so you don't have to specify redelivery delay
individually for each address.
The `redelivery-delay` can be also be modified by configuring the
`redelivery-collision-avoidance-factor`. This factor will be made either
positive or negative at random to control whether the ultimate value will
increase or decrease the `redelivery-delay`. Then it's multiplied by a random
number between 0.0 and 1.0. This result is then multiplied by the
`redelivery-delay` and then added to the `redelivery-delay` to arrive at the
final value.
The algorithm may sound complicated but the bottom line is quite simple: the
larger `redelivery-collision-avoidance-factor` you choose the larger the variance
of the `redelivery-delay` will be. The `redelivery-collision-avoidance-factor`
must be between 0.0 and 1.0.
**Example:**
- redelivery-delay=1000, redelivery-delay-multiplier=1, max-redelivery-delay=15000,
redelivery-collision-avoidance-factor=0.5, (bold values chosen using
`java.util.Random`)
1. Delivery Attempt 1. (Unsuccessful)
2. Wait Delay Period: 875 // 1000 + (1000 * ((0.5 * __-1__) * __.25__)
3. Delivery Attempt 2. (Unsuccessful)
4. Wait Delay Period: 1375 // 1000 + (1000 * ((0.5 * __1__) * __.75__)
5. Delivery Attempt 3: (Unsuccessful)
6. Wait Delay Period: 975 // 1000 + (1000 * ((0.5 * __-1__) * __.05__)
This feature can be particularly useful in environments where there are
multiple consumers on the same queue all interacting transactionally
with the same external system (e.g. a database). If there is overlapping
data in messages which are consumed concurrently then one transaction can
succeed while all the rest fail. If those failed messages are redelivered
at the same time then this process where one consumer succeeds and the
rest fail will continue. By randomly padding the redelivery-delay by a
small, configurable amount these redelivery "collisions" can be avoided.
### Example
See [the examples chapter](examples.md) for an example which shows how delayed redelivery is configured

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.client;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -302,6 +303,103 @@ public class RedeliveryConsumerTest extends ActiveMQTestBase {
}
@Test
public void testRedeliveryCollisionAvoidance() throws Exception {
setUp(false);
int numberOfThreads = 10;
long redeliveryDelay = 1000;
server.getAddressSettingsRepository().getMatch(ADDRESS.toString()).setRedeliveryDelay(redeliveryDelay).setRedeliveryCollisionAvoidanceFactor(0.5);
ClientSession session = factory.createSession(false, false, false);
ClientProducer prod = session.createProducer(ADDRESS);
for (int i = 0; i < numberOfThreads; i++) {
prod.send(createTextMessage(session, "Hello" + i));
}
session.commit();
session.close();
final CountDownLatch aligned = new CountDownLatch(numberOfThreads);
final CountDownLatch startRollback = new CountDownLatch(1);
class ConsumerThread extends Thread {
ConsumerThread(int i) {
super("RedeliveryCollisionAvoidance::" + i);
}
long delay = 0;
int errors = 0;
@Override
public void run() {
try (ServerLocator locator = createInVMNonHALocator()) {
locator.setConsumerWindowSize(0);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false);
session.start();
ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
msg.acknowledge();
aligned.countDown();
startRollback.await();
session.rollback();
long start = System.currentTimeMillis();
msg = consumer.receive(5000);
delay = System.currentTimeMillis() - start;
assertNotNull(msg);
msg.acknowledge();
session.commit();
} catch (Exception e) {
e.printStackTrace();
errors++;
}
}
}
ConsumerThread[] threads = new ConsumerThread[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
threads[i] = new ConsumerThread(i);
threads[i].start();
}
aligned.await();
startRollback.countDown();
try {
for (ConsumerThread t : threads) {
t.join(60000);
assertFalse(t.isAlive());
assertEquals("There are Errors on the test thread", 0, t.errors);
}
} finally {
for (ConsumerThread t : threads) {
if (t.isAlive()) {
t.interrupt();
}
t.join(1000);
}
}
long maxDelay = 0;
long minDelay = Long.MAX_VALUE;
for (ConsumerThread t : threads) {
if (t.delay < minDelay) {
minDelay = t.delay;
}
if (t.delay > maxDelay) {
maxDelay = t.delay;
}
}
// make sure the difference between the minimum redelivery delay and the maximum redelivery delay is larger that the expected nominal variance
assertTrue((maxDelay - minDelay) > (redeliveryDelay * .05));
factory.close();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------