ARTEMIS-2504 implement retroactive addresses

A new feature to preserve messages sent to an address for queues that will be
created on the address in the future. This is essentially equivalent to the
"retroactive consumer" feature from 5.x. However, it's implemented in a way
that fits with the address model of Artemis.
This commit is contained in:
Justin Bertram 2019-08-28 10:27:52 -05:00 committed by Clebert Suconic
parent c0e77e96d1
commit 84067d8fef
50 changed files with 1320 additions and 51 deletions

View File

@ -2277,4 +2277,11 @@ public interface AuditLogger extends BasicLogger {
void getRingSize(String user, Object source, Object... args);
static void isRetroactiveResource(Object source) {
LOGGER.isRetroactiveResource(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601503, value = "User {0} is getting retroactiveResource property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void isRetroactiveResource(String user, Object source, Object... args);
}

View File

@ -568,6 +568,8 @@ public final class ActiveMQDefaultConfiguration {
public static final long DEFAULT_ANALYZE_CRITICAL_TIMEOUT = 120000;
public static final long DEFAULT_RETROACTIVE_MESSAGE_COUNT = 0;
public static final CriticalAnalyzerPolicy DEFAULT_ANALYZE_CRITICAL_POLICY = CriticalAnalyzerPolicy.LOG;
/**
@ -1437,6 +1439,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_RING_SIZE;
}
public static long getDefaultRetroactiveMessageCount() {
return DEFAULT_RETROACTIVE_MESSAGE_COUNT;
}
public static int getDefaultConsumersBeforeDispatch() {
return DEFAULT_CONSUMERS_BEFORE_DISPATCH;
}

View File

@ -1384,7 +1384,8 @@ public interface ActiveMQServerControl {
@Parameter(desc = "delay for deleting auto-created queues", name = "autoDeleteQueuesDelay") long autoDeleteQueuesDelay,
@Parameter(desc = "the message count the queue must be at or below before it can be auto deleted", name = "autoDeleteQueuesMessageCount") long autoDeleteQueuesMessageCount,
@Parameter(desc = "delay for deleting auto-created addresses", name = "autoDeleteAddressesDelay") long autoDeleteAddressesDelay,
@Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor) throws Exception;
@Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor,
@Parameter(desc = "the number of messages to preserve for future queues created on the matching address", name = "retroactiveMessageCount") long retroactiveMessageCount) throws Exception;
@Operation(desc = "Remove address settings", impact = MBeanOperationInfo.ACTION)
void removeAddressSettings(@Parameter(desc = "an address match", name = "addressMatch") String addressMatch) throws Exception;

View File

@ -162,4 +162,7 @@ public interface AddressControl {
@Attribute(desc = "indicates if the queues bound to this address are paused")
boolean isPaused();
@Attribute(desc = "whether this address is used for a retroactive address")
boolean isRetroactiveResource();
}

View File

@ -115,6 +115,8 @@ public final class AddressSettingsInfo {
private final double redeliveryCollisionAvoidanceFactor;
private final long retroactiveMessageCount;
// Static --------------------------------------------------------
public static AddressSettingsInfo from(final String jsonString) {
@ -164,7 +166,8 @@ public final class AddressSettingsInfo {
object.getJsonNumber("autoDeleteQueuesDelay").longValue(),
object.getJsonNumber("autoDeleteQueuesMessageCount").longValue(),
object.getJsonNumber("autoDeleteAddressesDelay").longValue(),
object.getJsonNumber("redeliveryCollisionAvoidanceFactor").doubleValue());
object.getJsonNumber("redeliveryCollisionAvoidanceFactor").doubleValue(),
object.getJsonNumber("retroactiveMessageCount").longValue());
}
// Constructors --------------------------------------------------
@ -214,7 +217,8 @@ public final class AddressSettingsInfo {
long autoDeleteQueuesDelay,
long autoDeleteQueuesMessageCount,
long autoDeleteAddressesDelay,
double redeliveryCollisionAvoidanceFactor) {
double redeliveryCollisionAvoidanceFactor,
long retroactiveMessageCount) {
this.addressFullMessagePolicy = addressFullMessagePolicy;
this.maxSizeBytes = maxSizeBytes;
this.pageSizeBytes = pageSizeBytes;
@ -261,6 +265,7 @@ public final class AddressSettingsInfo {
this.autoDeleteQueuesMessageCount = autoDeleteQueuesMessageCount;
this.autoDeleteAddressesDelay = autoDeleteAddressesDelay;
this.redeliveryCollisionAvoidanceFactor = redeliveryCollisionAvoidanceFactor;
this.retroactiveMessageCount = retroactiveMessageCount;
}
// Public --------------------------------------------------------
@ -456,5 +461,9 @@ public final class AddressSettingsInfo {
public double getRedeliveryCollisionAvoidanceFactor() {
return redeliveryCollisionAvoidanceFactor;
}
public long getRetroactiveMessageCount() {
return retroactiveMessageCount;
}
}

View File

@ -85,4 +85,7 @@ public interface DivertControl {
*/
@Attribute(desc = "routing type used by this divert")
String getRoutingType();
@Attribute(desc = "whether this divert is for a retroactive address")
boolean isRetroactiveResource();
}

View File

@ -69,6 +69,12 @@ public interface QueueControl {
@Attribute(desc = "whether this queue is temporary")
boolean isTemporary();
/**
* Returns whether this queue is used for a retroactive address.
*/
@Attribute(desc = "whether this queue is used for a retroactive address")
boolean isRetroactiveResource();
/**
* Returns whether this queue is durable.
*/

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.api.core.management;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
/**
* Helper class used to build resource names used by management messages.
* <br>
@ -39,7 +42,33 @@ public final class ResourceNames {
public static final String BROADCAST_GROUP = "broadcastgroup.";
private ResourceNames() {
public static final String RETROACTIVE_SUFFIX = "retro";
public static SimpleString getRetroactiveResourceQueueName(String prefix, String delimiter, SimpleString address, RoutingType routingType) {
return getRetroactiveResourceName(prefix, delimiter, address, trimLastCharacter(QUEUE).concat(delimiter).concat(routingType.toString().toLowerCase()));
}
public static SimpleString getRetroactiveResourceAddressName(String prefix, String delimiter, SimpleString address) {
return getRetroactiveResourceName(prefix, delimiter, address, trimLastCharacter(ADDRESS));
}
public static SimpleString getRetroactiveResourceDivertName(String prefix, String delimiter, SimpleString address) {
return getRetroactiveResourceName(prefix, delimiter, address, trimLastCharacter(DIVERT));
}
private static SimpleString getRetroactiveResourceName(String prefix, String delimiter, SimpleString address, String resourceType) {
return SimpleString.toSimpleString(prefix.concat(address.toString()).concat(delimiter).concat(resourceType).concat(delimiter).concat(RETROACTIVE_SUFFIX));
}
public static boolean isRetroactiveResource(String prefix, SimpleString address) {
return address.toString().startsWith(prefix) && address.toString().endsWith(RETROACTIVE_SUFFIX);
}
public static String decomposeRetroactiveResourceAddressName(String prefix, String delimiter, String address) {
return address.substring(address.indexOf(prefix) + prefix.length(), address.indexOf(delimiter + trimLastCharacter(ADDRESS)));
}
private static String trimLastCharacter(String toTrim) {
return toTrim.substring(0, toTrim.length() - 1);
}
}

View File

@ -76,7 +76,8 @@ public class AddressSettingsInfoTest {
"\"autoDeleteQueuesDelay\":4,\n" +
"\"autoDeleteQueuesMessageCount\":8,\n" +
"\"autoDeleteAddressesDelay\":3003,\n" +
"\"redeliveryCollisionAvoidanceFactor\":1.1\n" +
"\"redeliveryCollisionAvoidanceFactor\":1.1,\n" +
"\"retroactiveMessageCount\":101\n" +
"}";
AddressSettingsInfo addressSettingsInfo = AddressSettingsInfo.from(json);
assertEquals("fullPolicy", addressSettingsInfo.getAddressFullMessagePolicy());
@ -125,6 +126,7 @@ public class AddressSettingsInfoTest {
assertEquals(8, addressSettingsInfo.getAutoDeleteQueuesMessageCount());
assertEquals(3003, addressSettingsInfo.getAutoDeleteAddressesDelay());
assertEquals(1.1, addressSettingsInfo.getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(101, addressSettingsInfo.getRetroactiveMessageCount());
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.management;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class ResourceNamesTest extends Assert {
char delimiterChar;
final String delimiter;
final SimpleString testAddress;
final String prefix;
final String baseName;
final String testResourceAddressName;
final String testResourceMulticastQueueName;
final String testResourceAnycastQueueName;
final String testResourceDivertName;
@Parameterized.Parameters(name = "delimiterChar={0}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][] {{'/'}, {'.'}});
}
public ResourceNamesTest(char delimiterChar) {
super();
this.delimiterChar = delimiterChar;
delimiter = "" + delimiterChar;
testAddress = SimpleString.toSimpleString(UUID.randomUUID().toString());
prefix = ActiveMQDefaultConfiguration.getInternalNamingPrefix().replace('.', delimiterChar);
baseName = prefix + testAddress + delimiter;
testResourceAddressName = baseName + ResourceNames.ADDRESS.replace('.', delimiterChar) + ResourceNames.RETROACTIVE_SUFFIX;
testResourceMulticastQueueName = baseName + ResourceNames.QUEUE.replace('.', delimiterChar) + RoutingType.MULTICAST.toString().toLowerCase() + delimiter + ResourceNames.RETROACTIVE_SUFFIX;
testResourceAnycastQueueName = baseName + ResourceNames.QUEUE.replace('.', delimiterChar) + RoutingType.ANYCAST.toString().toLowerCase() + delimiter + ResourceNames.RETROACTIVE_SUFFIX;
testResourceDivertName = baseName + ResourceNames.DIVERT.replace('.', delimiterChar) + ResourceNames.RETROACTIVE_SUFFIX;
}
@Test
public void testGetRetroactiveResourceAddressName() {
assertEquals(testResourceAddressName, ResourceNames.getRetroactiveResourceAddressName(prefix, delimiter, testAddress).toString());
}
@Test
public void testGetRetroactiveResourceQueueName() {
assertEquals(testResourceMulticastQueueName, ResourceNames.getRetroactiveResourceQueueName(prefix, delimiter, testAddress, RoutingType.MULTICAST).toString());
assertEquals(testResourceAnycastQueueName, ResourceNames.getRetroactiveResourceQueueName(prefix, delimiter, testAddress, RoutingType.ANYCAST).toString());
}
@Test
public void testGetRetroactiveResourceDivertName() {
assertEquals(testResourceDivertName, ResourceNames.getRetroactiveResourceDivertName(prefix, delimiter, testAddress).toString());
}
@Test
public void testDecomposeRetroactiveResourceAddressName() {
assertEquals(testAddress.toString(), ResourceNames.decomposeRetroactiveResourceAddressName(prefix, delimiter, testResourceAddressName));
}
@Test
public void testIsRetroactiveResource() {
assertTrue(ResourceNames.isRetroactiveResource(prefix, SimpleString.toSimpleString(testResourceAddressName)));
assertTrue(ResourceNames.isRetroactiveResource(prefix, SimpleString.toSimpleString(testResourceMulticastQueueName)));
assertTrue(ResourceNames.isRetroactiveResource(prefix, SimpleString.toSimpleString(testResourceDivertName)));
}
}

View File

@ -270,6 +270,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String DEFAULT_RING_SIZE = "default-ring-size";
private static final String RETROACTIVE_MESSAGE_COUNT = "retroactive-message-count";
// Attributes ----------------------------------------------------
@ -1174,6 +1176,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setDefaultConsumerWindowSize(XMLUtil.parseInt(child));
} else if (DEFAULT_RING_SIZE.equalsIgnoreCase(name)) {
addressSettings.setDefaultRingSize(XMLUtil.parseLong(child));
} else if (RETROACTIVE_MESSAGE_COUNT.equalsIgnoreCase(name)) {
long retroactiveMessageCount = XMLUtil.parseLong(child);
Validators.GE_ZERO.validate(RETROACTIVE_MESSAGE_COUNT, retroactiveMessageCount);
addressSettings.setRetroactiveMessageCount(retroactiveMessageCount);
}
}
return setting;

View File

@ -2752,6 +2752,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
.add("autoDeleteQueuesMessageCount", addressSettings.getAutoDeleteQueuesMessageCount())
.add("autoDeleteAddressesDelay", addressSettings.getAutoDeleteAddressesDelay())
.add("redeliveryCollisionAvoidanceFactor", addressSettings.getRedeliveryCollisionAvoidanceFactor())
.add("retroactiveMessageCount", addressSettings.getRetroactiveMessageCount())
.build()
.toString();
}
@ -2863,12 +2864,12 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
AddressSettings.DEFAULT_CONFIG_DELETE_QUEUES.toString(),
AddressSettings.DEFAULT_CONFIG_DELETE_ADDRESSES.toString(),
AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD,
ActiveMQDefaultConfiguration.getDefaultLastValueKey().toString(),
ActiveMQDefaultConfiguration.getDefaultLastValueKey() == null ? null : ActiveMQDefaultConfiguration.getDefaultLastValueKey().toString(),
ActiveMQDefaultConfiguration.getDefaultNonDestructive(),
ActiveMQDefaultConfiguration.getDefaultExclusive(),
ActiveMQDefaultConfiguration.getDefaultGroupRebalance(),
ActiveMQDefaultConfiguration.getDefaultGroupBuckets(),
ActiveMQDefaultConfiguration.getDefaultGroupFirstKey().toString(),
ActiveMQDefaultConfiguration.getDefaultGroupFirstKey() == null ? null : ActiveMQDefaultConfiguration.getDefaultGroupFirstKey().toString(),
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),
ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(),
@ -2881,7 +2882,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
AddressSettings.DEFAULT_AUTO_DELETE_QUEUES_DELAY,
AddressSettings.DEFAULT_AUTO_DELETE_QUEUES_MESSAGE_COUNT,
AddressSettings.DEFAULT_AUTO_DELETE_ADDRESSES_DELAY,
AddressSettings.DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR);
AddressSettings.DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR,
ActiveMQDefaultConfiguration.getDefaultRetroactiveMessageCount());
}
@Override
@ -2932,7 +2934,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final long autoDeleteQueuesDelay,
final long autoDeleteQueuesMessageCount,
final long autoDeleteAddressesDelay,
final double redeliveryCollisionAvoidanceFactor) throws Exception {
final double redeliveryCollisionAvoidanceFactor,
final long retroactiveMessageCount) throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.addAddressSettings(this.server, address, DLA, expiryAddress, expiryDelay, defaultLastValueQueue, maxDeliveryAttempts,
maxSizeBytes, pageSizeBytes, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier,
@ -2944,7 +2947,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
defaultGroupFirstKey, defaultMaxConsumers, defaultPurgeOnNoConsumers, defaultConsumersBeforeDispatch,
defaultDelayBeforeDispatch, defaultQueueRoutingType, defaultAddressRoutingType, defaultConsumerWindowSize,
defaultRingSize, autoDeleteCreatedQueues, autoDeleteQueuesDelay, autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay, redeliveryCollisionAvoidanceFactor);
autoDeleteAddressesDelay, redeliveryCollisionAvoidanceFactor, retroactiveMessageCount);
}
checkStarted();
@ -3005,6 +3008,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
addressSettings.setAutoDeleteQueuesMessageCount(autoDeleteQueuesMessageCount);
addressSettings.setAutoDeleteAddressesDelay(autoDeleteAddressesDelay);
addressSettings.setRedeliveryCollisionAvoidanceFactor(redeliveryCollisionAvoidanceFactor);
addressSettings.setRetroactiveMessageCount(retroactiveMessageCount);
server.getAddressSettingsRepository().addMatch(address, addressSettings);

View File

@ -419,6 +419,14 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return addressInfo.isPaused();
}
@Override
public boolean isRetroactiveResource() {
if (AuditLogger.isEnabled()) {
AuditLogger.isRetroactiveResource(this.addressInfo);
}
return ResourceNames.isRetroactiveResource(server.getInternalNamingPrefix(), addressInfo.getName());
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.Divert;
@ -38,6 +39,8 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
private final DivertConfiguration configuration;
private final String internalNamingPrefix;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -46,10 +49,12 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
public DivertControlImpl(final Divert divert,
final StorageManager storageManager,
final DivertConfiguration configuration) throws Exception {
final DivertConfiguration configuration,
final String internalNamingPrefix) throws Exception {
super(DivertControl.class, storageManager);
this.divert = divert;
this.configuration = configuration;
this.internalNamingPrefix = internalNamingPrefix;
}
@Override
@ -177,6 +182,14 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
}
}
@Override
public boolean isRetroactiveResource() {
if (AuditLogger.isEnabled()) {
AuditLogger.isRetroactiveResource(this.divert);
}
return ResourceNames.isRetroactiveResource(internalNamingPrefix, divert.getUniqueName());
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(DivertControl.class);

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
@ -234,6 +235,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public boolean isRetroactiveResource() {
if (AuditLogger.isEnabled()) {
AuditLogger.isRetroactiveResource(queue);
}
checkStarted();
clearIO();
try {
return ResourceNames.isRetroactiveResource(server.getInternalNamingPrefix(), queue.getName());
} finally {
blockOnIO();
}
}
@Override
public long getMessageCount() {
if (AuditLogger.isEnabled()) {

View File

@ -42,6 +42,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.NotificationType;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
@ -63,6 +65,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -78,6 +81,7 @@ import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
@ -459,6 +463,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (server.hasBrokerAddressPlugins()) {
server.callBrokerAddressPlugins(plugin -> plugin.afterAddAddress(addressInfo, reload));
}
long retroactiveMessageCount = addressSettingsRepository.getMatch(addressInfo.getName().toString()).getRetroactiveMessageCount();
if (retroactiveMessageCount > 0) {
createRetroactiveResources(addressInfo.getName(), retroactiveMessageCount, reload);
}
if (ResourceNames.isRetroactiveResource(server.getInternalNamingPrefix(), addressInfo.getName())) {
registerRepositoryListenerForRetroactiveAddress(addressInfo.getName());
}
} catch (Exception e) {
e.printStackTrace();
}
@ -467,6 +478,127 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
private void registerRepositoryListenerForRetroactiveAddress(SimpleString name) {
HierarchicalRepositoryChangeListener repositoryChangeListener = () -> {
String prefix = server.getInternalNamingPrefix();
String delimiter = server.getConfiguration().getWildcardConfiguration().getDelimiterString();
String address = ResourceNames.decomposeRetroactiveResourceAddressName(prefix, delimiter, name.toString());
AddressSettings settings = addressSettingsRepository.getMatch(address);
Queue internalAnycastQueue = server.locateQueue(ResourceNames.getRetroactiveResourceQueueName(prefix, delimiter, SimpleString.toSimpleString(address), RoutingType.ANYCAST));
if (internalAnycastQueue != null && internalAnycastQueue.getRingSize() != settings.getRetroactiveMessageCount()) {
internalAnycastQueue.setRingSize(settings.getRetroactiveMessageCount());
}
Queue internalMulticastQueue = server.locateQueue(ResourceNames.getRetroactiveResourceQueueName(prefix, delimiter, SimpleString.toSimpleString(address), RoutingType.MULTICAST));
if (internalMulticastQueue != null && internalMulticastQueue.getRingSize() != settings.getRetroactiveMessageCount()) {
internalMulticastQueue.setRingSize(settings.getRetroactiveMessageCount());
}
};
addressSettingsRepository.registerListener(repositoryChangeListener);
server.getAddressInfo(name).setRepositoryChangeListener(repositoryChangeListener);
}
private void createRetroactiveResources(final SimpleString retroactiveAddressName, final long retroactiveMessageCount, final boolean reload) throws Exception {
String prefix = server.getInternalNamingPrefix();
String delimiter = server.getConfiguration().getWildcardConfiguration().getDelimiterString();
final SimpleString internalAddressName = ResourceNames.getRetroactiveResourceAddressName(prefix, delimiter, retroactiveAddressName);
final SimpleString internalAnycastQueueName = ResourceNames.getRetroactiveResourceQueueName(prefix, delimiter, retroactiveAddressName, RoutingType.ANYCAST);
final SimpleString internalMulticastQueueName = ResourceNames.getRetroactiveResourceQueueName(prefix, delimiter, retroactiveAddressName, RoutingType.MULTICAST);
final SimpleString internalDivertName = ResourceNames.getRetroactiveResourceDivertName(prefix, delimiter, retroactiveAddressName);
if (!reload) {
AddressInfo addressInfo = new AddressInfo(internalAddressName)
.addRoutingType(RoutingType.MULTICAST)
.addRoutingType(RoutingType.ANYCAST)
.setInternal(false);
addAddressInfo(addressInfo);
AddressSettings addressSettings = addressSettingsRepository.getMatch(internalAddressName.toString());
server.createQueue(internalAddressName,
RoutingType.MULTICAST,
internalMulticastQueueName,
null,
null,
true,
false,
false,
false,
false,
0,
false,
false,
false,
addressSettings.getDefaultGroupBuckets(),
null,
addressSettings.isDefaultLastValueQueue(),
addressSettings.getDefaultLastValueKey(),
false,
0,
0L,
false,
0L,
0L,
false,
retroactiveMessageCount);
server.createQueue(internalAddressName,
RoutingType.ANYCAST,
internalAnycastQueueName,
null,
null,
true,
false,
false,
false,
false,
0,
false,
false,
false,
addressSettings.getDefaultGroupBuckets(),
null,
addressSettings.isDefaultLastValueQueue(),
addressSettings.getDefaultLastValueKey(),
false,
0,
0L,
false,
0L,
0L,
false,
retroactiveMessageCount);
}
server.deployDivert(new DivertConfiguration()
.setName(internalDivertName.toString())
.setAddress(retroactiveAddressName.toString())
.setExclusive(false)
.setForwardingAddress(internalAddressName.toString())
.setRoutingType(ComponentConfigurationRoutingType.PASS));
}
private void removeRetroactiveResources(SimpleString address) throws Exception {
String prefix = server.getInternalNamingPrefix();
String delimiter = server.getConfiguration().getWildcardConfiguration().getDelimiterString();
SimpleString internalDivertName = ResourceNames.getRetroactiveResourceDivertName(prefix, delimiter, address);
if (getBinding(internalDivertName) != null) {
server.destroyDivert(internalDivertName);
}
SimpleString internalAnycastQueueName = ResourceNames.getRetroactiveResourceQueueName(prefix, delimiter, address, RoutingType.ANYCAST);
if (server.locateQueue(internalAnycastQueueName) != null) {
server.destroyQueue(internalAnycastQueueName);
}
SimpleString internalMulticastQueueName = ResourceNames.getRetroactiveResourceQueueName(prefix, delimiter, address, RoutingType.MULTICAST);
if (server.locateQueue(internalMulticastQueueName) != null) {
server.destroyQueue(internalMulticastQueueName);
}
SimpleString internalAddressName = ResourceNames.getRetroactiveResourceAddressName(prefix, delimiter, address);
if (server.getAddressInfo(internalAddressName) != null) {
server.removeAddressInfo(internalAddressName, null);
}
}
@Override
public QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
@ -660,6 +792,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
managementService.unregisterAddress(address);
final AddressInfo addressInfo = addressManager.removeAddressInfo(address);
removeRetroactiveResources(address);
if (server.hasBrokerAddressPlugins()) {
server.callBrokerAddressPlugins(plugin -> plugin.afterRemoveAddress(address, addressInfo));
}

View File

@ -258,7 +258,7 @@ public class SimpleAddressManager implements AddressManager {
}
@Override
public boolean reloadAddressInfo(AddressInfo addressInfo) throws Exception {
public boolean reloadAddressInfo(AddressInfo addressInfo) {
boolean added = addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo) == null;
if (added) {
addressInfo.registerMeters(metricsManager);

View File

@ -68,7 +68,7 @@ public interface RoutingContext {
void setAddress(SimpleString address);
void setRoutingType(RoutingType routingType);
RoutingContext setRoutingType(RoutingType routingType);
SimpleString getAddress(Message message);

View File

@ -3175,9 +3175,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
recoverStoredConfigs();
Map<Long, AddressBindingInfo> addressBindingInfosMap = new HashMap<>();
journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos);
journalLoader.initAddresses(addressBindingInfos);
Map<Long, QueueBindingInfo> queueBindingInfosMap = new HashMap<>();
@ -3285,6 +3283,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
}
if (addressInfo.getRepositoryChangeListener() != null) {
addressSettingsRepository.unRegisterListener(addressInfo.getRepositoryChangeListener());
addressInfo.setRepositoryChangeListener(null);
}
long txID = storageManager.generateID();
storageManager.deleteAddressBinding(txID, addressInfo.getId());
storageManager.commitBindings(txID);
@ -3446,6 +3449,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
copyRetroactiveMessages(queue);
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue));
}
@ -3455,6 +3460,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return queue;
}
private void copyRetroactiveMessages(Queue queue) throws Exception {
if (addressSettingsRepository.getMatch(queue.getAddress().toString()).getRetroactiveMessageCount() > 0) {
Queue retroQueue = locateQueue(ResourceNames.getRetroactiveResourceQueueName(getInternalNamingPrefix(), getConfiguration().getWildcardConfiguration().getDelimiterString(), queue.getAddress(), queue.getRoutingType()));
if (retroQueue != null && retroQueue instanceof QueueImpl) {
((QueueImpl) retroQueue).rerouteMessages(queue.getName(), queue.getFilter());
}
}
}
@Override
public Queue createQueue(final SimpleString address,
final RoutingType routingType,

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.PrefixUtil;
@ -65,6 +66,7 @@ public class AddressInfo {
private PostOffice postOffice;
private StorageManager storageManager;
private HierarchicalRepositoryChangeListener repositoryChangeListener;
public AddressInfo(SimpleString name) {
this(name, EnumSet.noneOf(RoutingType.class));
@ -279,8 +281,9 @@ public class AddressInfo {
return this.internal;
}
public void setInternal(boolean internal) {
public AddressInfo setInternal(boolean internal) {
this.internal = internal;
return this;
}
public AddressInfo create(SimpleString name, RoutingType routingType) {
@ -318,6 +321,15 @@ public class AddressInfo {
return unRoutedMessageCountUpdater.get(this);
}
public HierarchicalRepositoryChangeListener getRepositoryChangeListener() {
return repositoryChangeListener;
}
public AddressInfo setRepositoryChangeListener(HierarchicalRepositoryChangeListener repositoryChangeListener) {
this.repositoryChangeListener = repositoryChangeListener;
return this;
}
public void registerMeters(MetricsManager metricsManager) {
if (metricsManager != null) {
metricsManager.registerAddressGauge(name.toString(), builder -> {

View File

@ -130,7 +130,7 @@ public class DivertImpl implements Divert {
copy = message;
}
postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false), false);
postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()), false);
}
@Override

View File

@ -38,8 +38,7 @@ public interface JournalLoader {
void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap,
List<QueueBindingInfo> queueBindingInfos) throws Exception;
void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
List<AddressBindingInfo> addressBindingInfo) throws Exception;
void initAddresses(List<AddressBindingInfo> addressBindingInfo) throws Exception;
void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception;

View File

@ -186,12 +186,9 @@ public class PostOfficeJournalLoader implements JournalLoader {
}
@Override
public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
List<AddressBindingInfo> addressBindingInfos) throws Exception {
public void initAddresses(List<AddressBindingInfo> addressBindingInfos) throws Exception {
for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
addressBindingInfosMap.put(addressBindingInfo.getId(), addressBindingInfo);
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes());
addressInfo.setId(addressBindingInfo.getId());
if (addressBindingInfo.getAddressStatusEncoding() != null && addressBindingInfo.getAddressStatusEncoding().getStatus() == AddressQueueStatus.PAUSED) {

View File

@ -1954,6 +1954,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private int iterQueue(final int flushLimit,
final Filter filter1,
QueueIterateAction messageAction) throws Exception {
return iterQueue(flushLimit, filter1, messageAction, true);
}
/**
* This is a generic method for any method interacting on the Queue to move or delete messages
* Instead of duplicate the feature we created an abstract class where you pass the logic for
* each message.
*
* @param filter1
* @param messageAction
* @return
* @throws Exception
*/
private synchronized int iterQueue(final int flushLimit,
final Filter filter1,
QueueIterateAction messageAction,
final boolean remove) throws Exception {
int count = 0;
int txCount = 0;
@ -1974,7 +1991,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (filter1 == null || filter1.match(ref.getMessage())) {
messageAction.actMessage(tx, ref);
iter.remove();
if (remove) {
iter.remove();
}
txCount++;
count++;
}
@ -2399,6 +2418,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
});
}
public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
RoutingContext routingContext = new RoutingContextImpl(tx);
routingContext.setAddress(server.locateQueue(queueName).getAddress());
server.getPostOffice().getBinding(queueName).route(ref.getMessage(), routingContext);
postOffice.processRoute(ref.getMessage(), routingContext, false);
}
}, false);
}
@Override
public int retryMessages(Filter filter) throws Exception {

View File

@ -196,11 +196,12 @@ public final class RoutingContextImpl implements RoutingContext {
}
@Override
public void setRoutingType(RoutingType routingType) {
public RoutingContext setRoutingType(RoutingType routingType) {
if (this.routingType == null || this.routingType != routingType) {
this.clear();
}
this.routingType = routingType;
return this;
}
@Override

View File

@ -282,7 +282,7 @@ public class ManagementServiceImpl implements ManagementService {
@Override
public synchronized void registerDivert(final Divert divert, final DivertConfiguration config) throws Exception {
ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), config.getAddress());
DivertControl divertControl = new DivertControlImpl(divert, storageManager, config);
DivertControl divertControl = new DivertControlImpl(divert, storageManager, config, messagingServer.getInternalNamingPrefix());
registerInJMX(objectName, divertControl);
registerInRegistry(ResourceNames.DIVERT + config.getName(), divertControl);

View File

@ -185,6 +185,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Long defaultRingSize = null;
private Long retroactiveMessageCount = null;
private DeletionPolicy configDeleteQueues = null;
private Boolean autoCreateAddresses = null;
@ -759,6 +761,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public long getRetroactiveMessageCount() {
return retroactiveMessageCount != null ? retroactiveMessageCount : ActiveMQDefaultConfiguration.DEFAULT_RETROACTIVE_MESSAGE_COUNT;
}
public AddressSettings setRetroactiveMessageCount(final long defaultRetroactiveMessageCount) {
this.retroactiveMessageCount = defaultRetroactiveMessageCount;
return this;
}
/**
* merge 2 objects in to 1
*
@ -919,6 +930,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (defaultRingSize == null) {
defaultRingSize = merged.defaultRingSize;
}
if (retroactiveMessageCount == null) {
retroactiveMessageCount = merged.retroactiveMessageCount;
}
}
@Override
@ -1087,6 +1101,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
defaultGroupFirstKey = buffer.readNullableSimpleString();
}
if (buffer.readableBytes() > 0) {
retroactiveMessageCount = BufferHelper.readNullableLong(buffer);
}
}
@Override
@ -1139,7 +1157,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
SimpleString.sizeofNullableString(defaultGroupFirstKey) +
BufferHelper.sizeOfNullableLong(autoDeleteQueuesMessageCount) +
BufferHelper.sizeOfNullableBoolean(autoDeleteCreatedQueues) +
BufferHelper.sizeOfNullableLong(defaultRingSize);
BufferHelper.sizeOfNullableLong(defaultRingSize) +
BufferHelper.sizeOfNullableLong(retroactiveMessageCount);
}
@Override
@ -1243,6 +1262,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableDouble(buffer, redeliveryCollisionAvoidanceFactor);
buffer.writeNullableSimpleString(defaultGroupFirstKey);
BufferHelper.writeNullableLong(buffer, retroactiveMessageCount);
}
/* (non-Javadoc)
@ -1303,6 +1324,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((defaultGroupBuckets == null) ? 0 : defaultGroupBuckets.hashCode());
result = prime * result + ((defaultGroupFirstKey == null) ? 0 : defaultGroupFirstKey.hashCode());
result = prime * result + ((defaultRingSize == null) ? 0 : defaultRingSize.hashCode());
result = prime * result + ((retroactiveMessageCount == null) ? 0 : retroactiveMessageCount.hashCode());
return result;
}
@ -1585,6 +1607,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!defaultRingSize.equals(other.defaultRingSize))
return false;
if (retroactiveMessageCount == null) {
if (other.retroactiveMessageCount != null)
return false;
} else if (!retroactiveMessageCount.equals(other.retroactiveMessageCount))
return false;
return true;
}
@ -1692,6 +1720,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
defaultGroupFirstKey +
", defaultRingSize=" +
defaultRingSize +
", retroactiveMessageCount=" +
retroactiveMessageCount +
"]";
}
}

View File

@ -3398,6 +3398,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="retroactive-message-count" type="xsd:long" default="0" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the number of messages to preserve for future queues created on the matching address
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>

View File

@ -362,6 +362,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a1").getDefaultQueueRoutingType());
assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a1").getDefaultAddressRoutingType());
assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize());
assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
@ -386,6 +387,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a2").getDefaultAddressRoutingType());
assertEquals(10000, conf.getAddressesSettings().get("a2").getDefaultConsumerWindowSize());
assertEquals(-1, conf.getAddressesSettings().get("a2").getDefaultRingSize());
assertEquals(10, conf.getAddressesSettings().get("a2").getRetroactiveMessageCount());
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());

View File

@ -0,0 +1,52 @@
/*
* Copyright The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.EnumSet;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
public class AddressBindingEncodingTest extends Assert {
@Test
public void testEncodeDecode() {
final SimpleString name = RandomUtil.randomSimpleString();
final boolean autoCreated = RandomUtil.randomBoolean();
final EnumSet<RoutingType> routingTypes = EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST);
PersistentAddressBindingEncoding encoding = new PersistentAddressBindingEncoding(name,
routingTypes,
autoCreated);
int size = encoding.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(size);
encoding.encode(encodedBuffer);
PersistentAddressBindingEncoding decoding = new PersistentAddressBindingEncoding();
decoding.decode(encodedBuffer);
assertEquals(name, decoding.getName());
assertEquals(autoCreated, decoding.autoCreated);
assertEquals(routingTypes, decoding.routingTypes);
}
}

View File

@ -431,6 +431,7 @@
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
<default-address-routing-type>ANYCAST</default-address-routing-type>
<default-consumer-window-size>10000</default-consumer-window-size>
<retroactive-message-count>10</retroactive-message-count>
</address-setting>
</address-settings>
<resource-limit-settings>

View File

@ -67,5 +67,6 @@
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
<default-address-routing-type>ANYCAST</default-address-routing-type>
<default-consumer-window-size>10000</default-consumer-window-size>
<retroactive-message-count>10</retroactive-message-count>
</address-setting>
</address-settings>

View File

@ -3364,6 +3364,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="retroactive-message-count" type="xsd:long" default="0" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the number of messages to preserve for future queues created on the matching address
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required">
@ -3537,6 +3545,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="retroactive-message-count" type="xsd:string" use="optional" default="0">
<xsd:annotation>
<xsd:documentation>
The number of messages to preserve for future queues created on this address
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>

View File

@ -40,6 +40,7 @@
* [Scheduled Messages](scheduled-messages.md)
* [Last-Value Queues](last-value-queues.md)
* [Ring Queues](ring-queues.md)
* [Retroactive Addresses](retroactive-addresses.md)
* [Exclusive Queues](exclusive-queues.md)
* [Message Grouping](message-grouping.md)
* [Consumer Priority](consumer-priority.md)

View File

@ -613,6 +613,7 @@ that would be found in the `broker.xml` file.
<default-queue-routing-type></default-queue-routing-type>
<default-address-routing-type></default-address-routing-type>
<default-ring-size>-1</default-ring-size>
<retroactive-message-count>0</retroactive-message-count>
</address-setting>
</address-settings>
```
@ -855,8 +856,13 @@ client and/or protocol semantics. Default is `MULTICAST`. Read more about
`default-consumer-window-size` defines the default `consumerWindowSize` value
for a `CORE` protocol consumer, if not defined the default will be set to
1 MiB (1024 * 1024 bytes). The consumer will use this value as the window size
if the value is not set on the client. Read more about [flow control](#flow-control).
if the value is not set on the client. Read more about
[flow control](flow-control.md).
`default-ring-size` defines the default `ring-size` value for any matching queue
which doesn't have `ring-size` explicitly defined. If not defined the default will
be set to -1. Read more about [ring queues](#ring-queue).
be set to -1. Read more about [ring queues](ring-queues.md).
`retroactive-message-count` defines the number of messages to preserve for future
queues created on the matching address. Defaults to 0. Read more about
[retroactive addresses](retroactive-addresses.md).

View File

@ -248,6 +248,7 @@ Name | Description | Default
[default-queue-routing-type](address-model.md#routing-type) | Routing type for auto-created queues if the type can't be otherwise determined | `MULTICAST`
[default-address-routing-type](address-model.md#routing-type) | Routing type for auto-created addresses if the type can't be otherwise determined | `MULTICAST`
[default-ring-size](ring-queues.md) | The ring-size applied to queues without an explicit `ring-size` configured | `-1`
[retroactive-message-count](retroactive-addresses.md) | the number of messages to preserve for future queues created on the matching address | `0`
## bridge type

View File

@ -0,0 +1,88 @@
# Retroactive Addresses
A "retroactive" address is an address that will preserve messages sent to it
for queues which will be created on it in the future. This can be useful in,
for example, publish-subscribe use cases where clients want to receive the
messages sent to the address *before* they they actually connected and created
their multicast "subscription" queue. Typically messages sent to an address
before a queue was created on it would simply be unavailable to those queues,
but with a retroactive address a fixed number of messages can be preserved by
the broker and automatically copied into queues subsequently created on the
address. This works for both anycast and multicast queues.
## Internal Retroactive Resources
To implement this functionality the broker will create 4 internal resources for
each retroactive address:
1. A non-exclusive [divert](#diverts) to grab the messages from the retroactive
address.
2. An address to receive the messages from the divert.
3. **Two** [ring queues](#ring-queues) to hold the messages sent to the address
by the divert - one for anycast and one for multicast. The general caveats
for ring queues still apply here. See [the chapter on ring queues](#ring-queues)
for more details.
These resources are important to be aware of as they will show up in the web
console and other management or metric views. They will be named according to
the following pattern:
```
<internal-naming-prefix><delimiter><source-address><delimiter>(divert|address|queue<delimiter>(anycast|multicast))<delimiter>retro
```
For example, if an address named `myAddress` had a `retroactive-message-count`
of 10 and the default `internal-naming-prefix` (i.e. `$.artemis.internal.`) and
the default delimiter (i.e. `.`) were being used then resources with these names
would be created:
1. A divert on `myAddress` named `$.artemis.internal.myAddress.divert.retro`
2. An address named `$.artemis.internal.myAddress.address.retro`
3. A multicast queue on the address from step #2 named
`$.artemis.internal.myAddress.queue.multicast.retro` with a `ring-size` of 10.
4. An anycast queue on the address from step #2 named
`$.artemis.internal.myAddress.queue.anycast.retro` with a `ring-size` of 10.
This pattern is important to note as it allows one to configure address-settings
if necessary. To configure custom address-settings you'd use a match like:
```
*.*.*.<source-address>.*.retro
```
Using the same example as above the `match` would be:
```
*.*.*.myAddress.*.retro
```
> Note:
>
> Changing the broker's `internal-naming-prefix` once these retroactive
> resources are created will break the retroactive functionality.
>
## Configuration
To configure an address to be "retroactive" simply configure the
`retroactive-message-count` `address-setting` to reflect the number of messages
you want the broker to preserve, e.g.:
```xml
<address-settings>
<address-setting match="orders">
<retroactive-message-count>100</retroactive-message-count>
</address-setting>
</address-settings>
```
The value for `retroactive-message-count` can be updated at runtime either via
`broker.xml` or via the management API just like any other address-setting.
However, if you *reduce* the value of `retroactive-message-count` an additional
administrative step will be required since this functionality is implemented
via ring queues. This is because a ring queue whose ring-size is reduced will
not automatically delete messages from the queue to meet the new ring-size in
order to avoid unintended message loss. Therefore, administrative action will
be required in this case to manually reduce the number of messages in the ring
queue via the management API.

View File

@ -28,14 +28,12 @@ import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
@ -44,6 +42,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
@RunWith(Parameterized.class)
@ -92,7 +91,7 @@ public class RequestReplyMultiProtocolTest extends OpenWireTestBase {
this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true);
this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue, null, true, false, -1, false, true);
AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info);
this.server.addAddressInfo(info);
}

View File

@ -16,12 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.crossprotocol;
import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
@ -30,10 +24,12 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import java.net.URI;
import java.util.Arrays;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
@ -48,6 +44,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
@RunWith(Parameterized.class)
public class RequestReplyNonJMSTest extends OpenWireTestBase {
@ -84,7 +82,7 @@ public class RequestReplyNonJMSTest extends OpenWireTestBase {
this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true);
this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue, null, true, false, -1, false, true);
AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info);
this.server.addAddressInfo(info);
}
@Test

View File

@ -727,7 +727,6 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
boolean autoDeleteJmsQueues = RandomUtil.randomBoolean();
boolean autoCreateJmsTopics = RandomUtil.randomBoolean();
boolean autoDeleteJmsTopics = RandomUtil.randomBoolean();
boolean autoCreateQueues = RandomUtil.randomBoolean();
boolean autoDeleteQueues = RandomUtil.randomBoolean();
boolean autoCreateAddresses = RandomUtil.randomBoolean();
@ -754,6 +753,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
long autoDeleteQueuesMessageCount = RandomUtil.randomPositiveLong();
long autoDeleteAddressesDelay = RandomUtil.randomPositiveLong();
double redeliveryCollisionAvoidanceFactor = RandomUtil.randomDouble();
long retroactiveMessageCount = RandomUtil.randomPositiveLong();
serverControl.addAddressSettings(addressMatch,
DLA,
@ -802,7 +802,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesDelay,
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor);
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount);
boolean ex = false;
try {
@ -853,7 +854,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesDelay,
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor);
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount);
} catch (Exception expected) {
ex = true;
}
@ -911,6 +913,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
assertEquals(autoDeleteQueuesMessageCount, info.getAutoDeleteQueuesMessageCount());
assertEquals(autoDeleteAddressesDelay, info.getAutoDeleteAddressesDelay());
assertEquals(redeliveryCollisionAvoidanceFactor, info.getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(retroactiveMessageCount, info.getRetroactiveMessageCount());
serverControl.addAddressSettings(addressMatch,
DLA,
@ -959,7 +962,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesDelay,
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor);
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount);
jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
info = AddressSettingsInfo.from(jsonString);
@ -1010,6 +1014,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
assertEquals(autoDeleteQueuesMessageCount, info.getAutoDeleteQueuesMessageCount());
assertEquals(autoDeleteAddressesDelay, info.getAutoDeleteAddressesDelay());
assertEquals(redeliveryCollisionAvoidanceFactor, info.getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(retroactiveMessageCount, info.getRetroactiveMessageCount());
ex = false;
try {
@ -1060,7 +1065,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
autoDeleteQueuesDelay,
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor);
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount);
} catch (Exception e) {
ex = true;
}

View File

@ -954,7 +954,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Parameter(desc = "delay for deleting auto-created queues", name = "autoDeleteQueuesDelay") long autoDeleteQueuesDelay,
@Parameter(desc = "the message count the queue must be at or below before it can be auto deleted", name = "autoDeleteQueuesMessageCount") long autoDeleteQueuesMessageCount,
@Parameter(desc = "delay for deleting auto-created addresses", name = "autoDeleteAddressesDelay") long autoDeleteAddressesDelay,
@Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor) throws Exception {
@Parameter(desc = "factor by which to modify the redelivery delay slightly to avoid collisions", name = "redeliveryCollisionAvoidanceFactor") double redeliveryCollisionAvoidanceFactor,
@Parameter(desc = "the number of messages to preserve for future queues created on the matching address", name = "retroactiveMessageCount") long retroactiveMessageCount) throws Exception {
proxy.invokeOperation("addAddressSettings",
addressMatch,
DLA,
@ -1003,7 +1004,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
autoDeleteQueuesDelay,
autoDeleteQueuesMessageCount,
autoDeleteAddressesDelay,
redeliveryCollisionAvoidanceFactor);
redeliveryCollisionAvoidanceFactor,
retroactiveMessageCount);
}
@Override

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.core.management.RoleInfo;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.CheckType;
@ -84,6 +85,18 @@ public class AddressControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testIsRetroactiveResource() throws Exception {
SimpleString baseAddress = RandomUtil.randomSimpleString();
SimpleString address = ResourceNames.getRetroactiveResourceAddressName(server.getInternalNamingPrefix(), server.getConfiguration().getWildcardConfiguration().getDelimiterString(), baseAddress);
session.createAddress(address, RoutingType.MULTICAST, false);
AddressControl addressControl = createManagementControl(address);
Assert.assertTrue(addressControl.isRetroactiveResource());
}
@Test
public void testGetQueueNames() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();

View File

@ -133,6 +133,11 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
return (boolean) proxy.retrieveAttributeValue("paused");
}
@Override
public boolean isRetroactiveResource() {
return (boolean) proxy.retrieveAttributeValue("retroactiveResource");
}
@Override
public String sendMessage(Map<String, String> headers,
int type,

View File

@ -16,9 +16,11 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
@ -68,6 +70,28 @@ public class DivertControlTest extends ManagementTestBase {
Assert.assertEquals(divertConfig.getTransformerConfiguration().getProperties(), divertControl.getTransformerProperties());
}
@Test
public void testRetroactiveResourceAttribute() throws Exception {
String address = RandomUtil.randomString();
CoreQueueConfiguration queueConfig = new CoreQueueConfiguration().setAddress(RandomUtil.randomString()).setName(RandomUtil.randomString()).setDurable(false);
CoreQueueConfiguration forwardQueueConfig = new CoreQueueConfiguration().setAddress(address).setName(RandomUtil.randomString()).setDurable(false);
divertConfig = new DivertConfiguration()
.setName(ResourceNames.getRetroactiveResourceDivertName(server.getInternalNamingPrefix(), server.getConfiguration().getWildcardConfiguration().getDelimiterString(), SimpleString.toSimpleString(address)).toString())
.setRoutingName(RandomUtil.randomString()).setAddress(queueConfig.getAddress())
.setForwardingAddress(forwardQueueConfig.getAddress())
.setExclusive(RandomUtil.randomBoolean())
.setTransformerConfiguration(new TransformerConfiguration(AddHeadersTransformer.class.getName()));
server.deployDivert(divertConfig);
checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(divertConfig.getName(), divertConfig.getAddress()));
DivertControl divertControl = createDivertManagementControl(divertConfig.getName(), divertConfig.getAddress());
Assert.assertTrue(divertControl.isRetroactiveResource());
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -88,6 +88,11 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
return (Boolean) proxy.retrieveAttributeValue("exclusive");
}
@Override
public boolean isRetroactiveResource() {
return (Boolean) proxy.retrieveAttributeValue("retroactiveResource");
}
};
}

View File

@ -56,6 +56,7 @@ import org.apache.activemq.artemis.api.core.management.DayCounterInfo;
import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
@ -67,8 +68,8 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
@ -121,6 +122,27 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testRetroactiveResourceAttribute() throws Exception {
SimpleString baseAddress = RandomUtil.randomSimpleString();
String internalNamingPrefix = server.getInternalNamingPrefix();
String delimiter = server.getConfiguration().getWildcardConfiguration().getDelimiterString();
SimpleString address = ResourceNames.getRetroactiveResourceAddressName(internalNamingPrefix, delimiter, baseAddress);
SimpleString multicastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, baseAddress, RoutingType.MULTICAST);
SimpleString anycastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, baseAddress, RoutingType.ANYCAST);
session.createQueue(address, RoutingType.MULTICAST, multicastQueue, null, durable);
session.createQueue(address, RoutingType.MULTICAST, anycastQueue, null, durable);
QueueControl queueControl = createManagementControl(address, multicastQueue);
Assert.assertTrue(queueControl.isRetroactiveResource());
queueControl = createManagementControl(address, anycastQueue);
Assert.assertTrue(queueControl.isRetroactiveResource());
session.deleteQueue(multicastQueue);
session.deleteQueue(anycastQueue);
}
@Test
public void testGetNullFilter() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();

View File

@ -310,6 +310,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Boolean) proxy.retrieveAttributeValue("temporary");
}
@Override
public boolean isRetroactiveResource() {
return (Boolean) proxy.retrieveAttributeValue("retroactiveResource");
}
@Override
public String listMessageCounter() throws Exception {
return (String) proxy.invokeOperation("listMessageCounter");

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.junit.Before;
import org.junit.Test;
public class RetroactiveAddressFailoverTest extends FailoverTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
protected ServerLocator locator;
protected ClientSessionFactoryInternal sf;
String internalNamingPrefix = ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX;
String delimiter = ".";
@Override
@Before
public void setUp() throws Exception {
super.setUp();
locator = getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(300).setRetryInterval(100);
sf = createSessionFactoryAndWaitForTopology(locator, 2);
}
@Test
public void testFailover() throws Exception {
final int MESSAGE_COUNT = 10;
final int OFFSET = 5;
ActiveMQServer live = liveServer.getServer();
ActiveMQServer backup = backupServer.getServer();
ClientSession session = addClientSession(sf.createSession(true, true));
final SimpleString queueName = SimpleString.toSimpleString("simpleQueue");
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
live.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(MESSAGE_COUNT));
backup.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(MESSAGE_COUNT));
live.addAddressInfo(new AddressInfo(addressName));
ClientProducer producer = addClientProducer(session.createProducer(addressName));
for (int j = 0; j < OFFSET; j++) {
ClientMessage message = session.createMessage(true);
message.putIntProperty("xxx", j);
producer.send(message);
}
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> live.locateQueue(divertQueue).getMessageCount() == OFFSET, 3000, 50);
crash(session);
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(divertQueue).getMessageCount() == OFFSET, 3000, 50);
for (int j = OFFSET; j < MESSAGE_COUNT + OFFSET; j++) {
ClientMessage message = session.createMessage(true);
message.putIntProperty("xxx", j);
producer.send(message);
}
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(divertQueue).getMessageCount() == MESSAGE_COUNT, 3000, 50);
session.createQueue(addressName.toString(), RoutingType.ANYCAST, queueName.toString());
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(queueName) != null);
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(queueName).getMessageCount() == MESSAGE_COUNT, 500, 50);
ClientConsumer consumer = session.createConsumer(queueName);
for (int j = OFFSET; j < MESSAGE_COUNT + OFFSET; j++) {
session.start();
ClientMessage message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
assertEquals(j, (int) message.getIntProperty("xxx"));
}
consumer.close();
session.deleteQueue(queueName);
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMConnector(live);
}
}

View File

@ -0,0 +1,476 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class RetroactiveAddressTest extends ActiveMQTestBase {
protected ActiveMQServer server;
protected ClientSession session;
protected ClientSessionFactory sf;
protected ServerLocator locator;
String internalNamingPrefix;
char delimiterChar;
String delimiter;
@Parameterized.Parameters(name = "delimiterChar={0}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][] {{'/'}, {'.'}});
}
public RetroactiveAddressTest(char delimiterChar) {
super();
this.delimiterChar = delimiterChar;
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(true, createDefaultInVMConfig());
server.getConfiguration().setInternalNamingPrefix(ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX.replace('.', delimiterChar));
server.getConfiguration().getWildcardConfiguration().setDelimiter(delimiterChar);
server.start();
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(false, true, true));
internalNamingPrefix = server.getConfiguration().getInternalNamingPrefix();
delimiter = server.getConfiguration().getWildcardConfiguration().getDelimiterString();
}
@Test
public void testRetroactiveResourceCreation() throws Exception {
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertAddress = ResourceNames.getRetroactiveResourceAddressName(internalNamingPrefix, delimiter, addressName);
final SimpleString divertMulticastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
final SimpleString divertAnycastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.ANYCAST);
final SimpleString divert = ResourceNames.getRetroactiveResourceDivertName(internalNamingPrefix, delimiter, addressName);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10));
server.addAddressInfo(new AddressInfo(addressName));
assertNotNull(server.getAddressInfo(divertAddress));
assertNotNull(server.locateQueue(divertMulticastQueue));
assertEquals(RoutingType.MULTICAST, server.locateQueue(divertMulticastQueue).getRoutingType());
assertNotNull(server.locateQueue(divertAnycastQueue));
assertEquals(RoutingType.ANYCAST, server.locateQueue(divertAnycastQueue).getRoutingType());
assertNotNull(server.getPostOffice().getBinding(divert));
}
@Test
public void testRetroactiveResourceRemoval() throws Exception {
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertAddress = ResourceNames.getRetroactiveResourceAddressName(internalNamingPrefix, delimiter, addressName);
final SimpleString divertMulticastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
final SimpleString divertAnycastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.ANYCAST);
final SimpleString divert = ResourceNames.getRetroactiveResourceDivertName(internalNamingPrefix, delimiter, addressName);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10));
server.addAddressInfo(new AddressInfo(addressName));
assertNotNull(server.getAddressInfo(divertAddress));
assertNotNull(server.locateQueue(divertMulticastQueue));
assertNotNull(server.locateQueue(divertAnycastQueue));
assertNotNull(server.getPostOffice().getBinding(divert));
server.removeAddressInfo(addressName, null, true);
assertNull(server.getAddressInfo(divertAddress));
assertNull(server.locateQueue(divertAnycastQueue));
assertNull(server.locateQueue(divertMulticastQueue));
assertNull(server.getPostOffice().getBinding(divert));
}
@Test
public void testRetroactiveAddress() throws Exception {
final int COUNT = 15;
final int LOOPS = 25;
final SimpleString queueName = SimpleString.toSimpleString("simpleQueue");
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
server.addAddressInfo(new AddressInfo(addressName));
for (int i = 0; i < LOOPS; i++) {
ClientProducer producer = session.createProducer(addressName);
for (int j = 0; j < COUNT; j++) {
ClientMessage message = session.createMessage(false);
message.putIntProperty("xxx", (i * COUNT) + j);
producer.send(message);
}
producer.close();
final int finalI = i;
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessagesReplaced() == (COUNT * finalI), 3000, 50);
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == COUNT, 3000, 50);
session.createQueue(addressName.toString(), RoutingType.ANYCAST, queueName.toString());
Wait.assertTrue(() -> server.locateQueue(queueName) != null);
Wait.assertTrue(() -> server.locateQueue(queueName).getMessageCount() == COUNT, 500, 50);
ClientConsumer consumer = session.createConsumer(queueName);
for (int j = 0; j < COUNT; j++) {
session.start();
ClientMessage message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
assertEquals((i * COUNT) + j, (int) message.getIntProperty("xxx"));
}
consumer.close();
session.deleteQueue(queueName);
}
}
@Test
public void testRestart() throws Exception {
final String data = "Simple Text " + UUID.randomUUID().toString();
final SimpleString queueName1 = SimpleString.toSimpleString("simpleQueue1");
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertMulticastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10));
server.addAddressInfo(new AddressInfo(addressName));
ClientProducer producer = session.createProducer(addressName);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString(data + "1");
producer.send(message);
producer.close();
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 500, 50);
server.stop();
server.start();
assertNotNull(server.locateQueue(divertMulticastQueue));
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 500, 50);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10));
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(false, true, true));
producer = session.createProducer(addressName);
message = session.createMessage(true);
message.getBodyBuffer().writeString(data + "2");
producer.send(message);
producer.close();
session.createQueue(addressName.toString(), RoutingType.ANYCAST, queueName1.toString());
Wait.assertTrue(() -> server.locateQueue(queueName1) != null);
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 2, 500, 50);
ClientConsumer consumer = session.createConsumer(queueName1);
session.start();
message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
assertEquals(data + "1", message.getBodyBuffer().readString());
message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
assertEquals(data + "2", message.getBodyBuffer().readString());
consumer.close();
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 0, 500, 50);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 2, 2000, 100);
}
@Test
public void testUpdateAfterRestart() throws Exception {
final int COUNT = 10;
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertAnycastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.ANYCAST);
final SimpleString divertMulticastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
server.addAddressInfo(new AddressInfo(addressName));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT, 1000, 100);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT, 1000, 100);
server.stop();
server.start();
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT * 2));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT * 2, 1000, 100);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT * 2, 1000, 100);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT, 1000, 100);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT, 1000, 100);
}
@Test
public void testMulticast() throws Exception {
final String data = "Simple Text " + UUID.randomUUID().toString();
final SimpleString queueName1 = SimpleString.toSimpleString("simpleQueue1");
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10));
server.addAddressInfo(new AddressInfo(addressName));
ClientProducer producer = session.createProducer(addressName);
ClientMessage message = session.createMessage(false);
message.getBodyBuffer().writeString(data);
message.setRoutingType(RoutingType.MULTICAST);
producer.send(message);
producer.close();
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 1, 500, 50);
session.createQueue(addressName.toString(), RoutingType.MULTICAST, queueName1.toString());
Wait.assertTrue(() -> server.locateQueue(queueName1) != null);
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 1, 500, 50);
ClientConsumer consumer = session.createConsumer(queueName1);
session.start();
message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
assertEquals(data, message.getBodyBuffer().readString());
consumer.close();
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 0, 500, 50);
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 1, 2000, 100);
}
@Test
public void testJMSTopicSubscribers() throws Exception {
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final int COUNT = 10;
final SimpleString divertQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
server.addAddressInfo(new AddressInfo(addressName));
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
Connection c = cf.createConnection();
Session s = c.createSession();
Topic t = s.createTopic(addressName.toString());
MessageProducer producer = s.createProducer(t);
for (int i = 0; i < COUNT * 2; i++) {
Message m = s.createMessage();
m.setIntProperty("test", i);
producer.send(m);
}
producer.close();
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == COUNT, 500, 50);
MessageConsumer consumer = s.createConsumer(t);
c.start();
for (int i = 0; i < COUNT; i++) {
Message m = consumer.receive(500);
assertNotNull(m);
assertEquals(i + COUNT, m.getIntProperty("test"));
}
assertNull(consumer.receiveNoWait());
}
@Test
public void testUpdateAddressSettings() throws Exception {
final int COUNT = 10;
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertAnycastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.ANYCAST);
final SimpleString divertMulticastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
server.addAddressInfo(new AddressInfo(addressName));
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT * 2));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT * 2, 1000, 100);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT * 2, 1000, 100);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT, 1000, 100);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT, 1000, 100);
}
@Test
public void testRoutingTypes() throws Exception {
final String data = "Simple Text " + UUID.randomUUID().toString();
final SimpleString multicastQueue = SimpleString.toSimpleString("multicastQueue");
final SimpleString anycastQueue = SimpleString.toSimpleString("anycastQueue");
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertMulticastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
final SimpleString divertAnycastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.ANYCAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10));
server.addAddressInfo(new AddressInfo(addressName));
ClientProducer producer = session.createProducer(addressName);
ClientMessage message = session.createMessage(false);
message.getBodyBuffer().writeString(data + RoutingType.MULTICAST.toString());
message.setRoutingType(RoutingType.MULTICAST);
producer.send(message);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 500, 50);
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getMessageCount() == 0, 500, 50);
message = session.createMessage(false);
message.getBodyBuffer().writeString(data + RoutingType.ANYCAST.toString());
message.setRoutingType(RoutingType.ANYCAST);
producer.send(message);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 500, 50);
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getMessageCount() == 1, 500, 50);
producer.close();
session.createQueue(addressName.toString(), RoutingType.MULTICAST, multicastQueue.toString());
Wait.assertTrue(() -> server.locateQueue(multicastQueue) != null);
Wait.assertTrue(() -> server.locateQueue(multicastQueue).getMessageCount() == 1, 500, 50);
session.createQueue(addressName.toString(), RoutingType.ANYCAST, anycastQueue.toString());
Wait.assertTrue(() -> server.locateQueue(anycastQueue) != null);
Wait.assertTrue(() -> server.locateQueue(anycastQueue).getMessageCount() == 1, 500, 50);
ClientConsumer consumer = session.createConsumer(multicastQueue);
session.start();
message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
assertEquals(data + RoutingType.MULTICAST.toString(), message.getBodyBuffer().readString());
consumer.close();
Wait.assertTrue(() -> server.locateQueue(multicastQueue).getMessageCount() == 0, 500, 50);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 2000, 100);
consumer.close();
consumer = session.createConsumer(anycastQueue);
session.start();
message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
assertEquals(data + RoutingType.ANYCAST.toString(), message.getBodyBuffer().readString());
consumer.close();
Wait.assertTrue(() -> server.locateQueue(anycastQueue).getMessageCount() == 0, 500, 50);
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getMessageCount() == 1, 2000, 100);
}
@Test
public void testFilter() throws Exception {
final SimpleString queueName1 = SimpleString.toSimpleString("simpleQueue1");
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10));
server.addAddressInfo(new AddressInfo(addressName));
ClientProducer producer = session.createProducer(addressName);
ClientMessage message = session.createMessage(false);
message.putLongProperty("xxx", 5);
producer.send(message);
message = session.createMessage(false);
message.putLongProperty("xxx", 15);
producer.send(message);
producer.close();
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 2, 500, 50);
session.createQueue(addressName.toString(), RoutingType.MULTICAST, queueName1.toString(), "xxx > 10", false);
Wait.assertTrue(() -> server.locateQueue(queueName1) != null);
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 1, 500, 50);
ClientConsumer consumer = session.createConsumer(queueName1);
session.start();
message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
assertEquals(15, (long) message.getLongProperty("xxx"));
consumer.close();
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 0, 500, 50);
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 2, 2000, 100);
}
@Test
public void testAddressSettingOnRetroactiveResource() throws Exception {
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertAddress = ResourceNames.getRetroactiveResourceAddressName(internalNamingPrefix, delimiter, addressName);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10));
server.addAddressInfo(new AddressInfo(addressName));
assertEquals(-1, server.getAddressSettingsRepository().getMatch(divertAddress.toString()).getMaxSizeBytes());
server.getAddressSettingsRepository().addMatch("*" + delimiter + "*" + delimiter + "*" + delimiter + addressName + delimiter + "*" + delimiter + ResourceNames.RETROACTIVE_SUFFIX, new AddressSettings().setMaxSizeBytes(13));
assertEquals(13, server.getAddressSettingsRepository().getMatch(divertAddress.toString()).getMaxSizeBytes());
}
@Test
public void testPaging() throws Exception {
final SimpleString queueName = SimpleString.toSimpleString("simpleQueue");
final SimpleString randomQueueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
final SimpleString addressName = SimpleString.toSimpleString("myAddress");
final SimpleString divertQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
final int MESSAGE_COUNT = 20;
final int MESSAGE_SIZE = 1024;
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(MESSAGE_COUNT).setMaxSizeBytes(1024 * 20).setPageSizeBytes(1024 * 10).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
server.addAddressInfo(new AddressInfo(addressName));
server.createQueue(addressName, RoutingType.MULTICAST, randomQueueName, null, true, false);
ClientProducer producer = session.createProducer(addressName);
byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}
for (int i = 0; i < MESSAGE_COUNT * 2; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body);
producer.send(message);
}
producer.close();
Wait.assertTrue(() -> server.locateQueue(randomQueueName).getMessageCount() == MESSAGE_COUNT * 2, 500, 50);
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == MESSAGE_COUNT, 500, 50);
session.createQueue(addressName.toString(), RoutingType.MULTICAST, queueName.toString());
Wait.assertTrue(() -> server.locateQueue(queueName) != null);
Wait.assertTrue(() -> server.locateQueue(queueName).getMessageCount() == MESSAGE_COUNT, 500, 50);
ClientConsumer consumer = session.createConsumer(queueName);
session.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {
ClientMessage message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
}
consumer.close();
Wait.assertTrue(() -> server.locateQueue(queueName).getMessageCount() == 0, 500, 50);
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == MESSAGE_COUNT, 2000, 100);
}
}

View File

@ -50,8 +50,7 @@ public class FakeJournalLoader implements JournalLoader {
}
@Override
public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
List<AddressBindingInfo> addressBindingInfo) throws Exception {
public void initAddresses(List<AddressBindingInfo> addressBindingInfo) throws Exception {
}
@Override