ARTEMIS-1378 Improving update Queues and Addresses
This commit is contained in:
parent
8e7e55543c
commit
6483e4a0aa
artemis-server/src
main/java/org/apache/activemq/artemis/core
test/java/org/apache/activemq/artemis/tests/util
tests
integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client
unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl
|
@ -62,9 +62,13 @@ public interface AddressManager {
|
|||
* @param addressInfo
|
||||
* @return true if the address was added, false if it wasn't added
|
||||
*/
|
||||
boolean addAddressInfo(AddressInfo addressInfo);
|
||||
boolean addAddressInfo(AddressInfo addressInfo) throws Exception;
|
||||
|
||||
AddressInfo updateAddressInfo(SimpleString addressName, Collection<RoutingType> routingTypes);
|
||||
boolean reloadAddressInfo(AddressInfo addressInfo) throws Exception;
|
||||
|
||||
/** it will return null if there are no updates.
|
||||
* it will throw an exception if the address doesn't exist */
|
||||
AddressInfo updateAddressInfo(SimpleString addressName, Collection<RoutingType> routingTypes) throws Exception;
|
||||
|
||||
AddressInfo removeAddressInfo(SimpleString address);
|
||||
|
||||
|
|
|
@ -49,7 +49,11 @@ public interface PostOffice extends ActiveMQComponent {
|
|||
* @param addressInfo
|
||||
* @return true if the address was added, false if it wasn't added
|
||||
*/
|
||||
boolean addAddressInfo(AddressInfo addressInfo);
|
||||
boolean addAddressInfo(AddressInfo addressInfo) throws Exception;
|
||||
|
||||
default void reloadAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||
addAddressInfo(addressInfo);
|
||||
}
|
||||
|
||||
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
|
||||
|
||||
|
|
|
@ -158,9 +158,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
this.reaperPriority = reaperPriority;
|
||||
|
||||
if (wildcardConfiguration.isEnabled()) {
|
||||
addressManager = new WildcardAddressManager(this, wildcardConfiguration);
|
||||
addressManager = new WildcardAddressManager(this, wildcardConfiguration, storageManager);
|
||||
} else {
|
||||
addressManager = new SimpleAddressManager(this, wildcardConfiguration);
|
||||
addressManager = new SimpleAddressManager(this, wildcardConfiguration, storageManager);
|
||||
}
|
||||
|
||||
this.idCacheSize = idCacheSize;
|
||||
|
@ -422,9 +422,23 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
// PostOffice implementation -----------------------------------------------
|
||||
|
||||
@Override
|
||||
public boolean addAddressInfo(AddressInfo addressInfo) {
|
||||
public void reloadAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||
internalAddressInfo(addressInfo, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||
return internalAddressInfo(addressInfo, false);
|
||||
}
|
||||
|
||||
private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) throws Exception {
|
||||
synchronized (addressLock) {
|
||||
boolean result = addressManager.addAddressInfo(addressInfo);
|
||||
boolean result;
|
||||
if (reload) {
|
||||
result = addressManager.reloadAddressInfo(addressInfo);
|
||||
} else {
|
||||
result = addressManager.addAddressInfo(addressInfo);
|
||||
}
|
||||
// only register address if it is new
|
||||
if (result) {
|
||||
try {
|
||||
|
@ -437,24 +451,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
}
|
||||
}
|
||||
|
||||
/** used on update queue, to validate when a value needs update */
|
||||
private static int replaceNull(Integer value) {
|
||||
if (value == null) {
|
||||
return -1;
|
||||
} else {
|
||||
return value.intValue();
|
||||
}
|
||||
}
|
||||
|
||||
/** used on update queue, to validate when a value needs update */
|
||||
private static boolean replaceNull(Boolean value) {
|
||||
if (value == null) {
|
||||
return false;
|
||||
} else {
|
||||
return value.booleanValue();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueBinding updateQueue(SimpleString name,
|
||||
RoutingType routingType,
|
||||
|
@ -468,13 +464,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
|
||||
final Queue queue = queueBinding.getQueue();
|
||||
|
||||
if (queue.getRoutingType() == routingType && replaceNull(maxConsumers) == replaceNull(queue.getMaxConsumers()) && queue.isPurgeOnNoConsumers() == replaceNull(purgeOnNoConsumers)) {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.tracef("Queue " + name + " didn't need to be updated");
|
||||
}
|
||||
return queueBinding;
|
||||
}
|
||||
boolean changed = false;
|
||||
|
||||
//validate update
|
||||
if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
|
||||
|
@ -493,24 +483,29 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
}
|
||||
|
||||
//atomic update
|
||||
if (maxConsumers != null) {
|
||||
if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) {
|
||||
changed = true;
|
||||
queue.setMaxConsumer(maxConsumers);
|
||||
}
|
||||
if (routingType != null) {
|
||||
if (routingType != null && queue.getRoutingType() != routingType) {
|
||||
changed = true;
|
||||
queue.setRoutingType(routingType);
|
||||
}
|
||||
if (purgeOnNoConsumers != null) {
|
||||
if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) {
|
||||
changed = true;
|
||||
queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
|
||||
}
|
||||
|
||||
final long txID = storageManager.generateID();
|
||||
try {
|
||||
storageManager.updateQueueBinding(txID, queueBinding);
|
||||
storageManager.commitBindings(txID);
|
||||
} catch (Throwable throwable) {
|
||||
storageManager.rollback(txID);
|
||||
logger.warn(throwable.getMessage(), throwable);
|
||||
throw throwable;
|
||||
if (changed) {
|
||||
final long txID = storageManager.generateID();
|
||||
try {
|
||||
storageManager.updateQueueBinding(txID, queueBinding);
|
||||
storageManager.commitBindings(txID);
|
||||
} catch (Throwable throwable) {
|
||||
storageManager.rollback(txID);
|
||||
logger.warn(throwable.getMessage(), throwable);
|
||||
throw throwable;
|
||||
}
|
||||
}
|
||||
|
||||
return queueBinding;
|
||||
|
@ -520,9 +515,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
@Override
|
||||
public AddressInfo updateAddressInfo(SimpleString addressName,
|
||||
Collection<RoutingType> routingTypes) throws Exception {
|
||||
|
||||
synchronized (addressLock) {
|
||||
return addressManager.updateAddressInfo(addressName, routingTypes);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
|
@ -26,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.Address;
|
||||
import org.apache.activemq.artemis.core.postoffice.AddressManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
|
@ -48,6 +50,8 @@ public class SimpleAddressManager implements AddressManager {
|
|||
|
||||
private final ConcurrentMap<SimpleString, AddressInfo> addressInfoMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final StorageManager storageManager;
|
||||
|
||||
/**
|
||||
* HashMap<Address, Binding>
|
||||
*/
|
||||
|
@ -62,13 +66,16 @@ public class SimpleAddressManager implements AddressManager {
|
|||
|
||||
protected final WildcardConfiguration wildcardConfiguration;
|
||||
|
||||
public SimpleAddressManager(final BindingsFactory bindingsFactory) {
|
||||
this(bindingsFactory, new WildcardConfiguration());
|
||||
public SimpleAddressManager(final BindingsFactory bindingsFactory, final StorageManager storageManager) {
|
||||
this(bindingsFactory, new WildcardConfiguration(), storageManager);
|
||||
}
|
||||
|
||||
public SimpleAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration) {
|
||||
public SimpleAddressManager(final BindingsFactory bindingsFactory,
|
||||
final WildcardConfiguration wildcardConfiguration,
|
||||
final StorageManager storageManager) {
|
||||
this.wildcardConfiguration = wildcardConfiguration;
|
||||
this.bindingsFactory = bindingsFactory;
|
||||
this.storageManager = storageManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -134,8 +141,7 @@ public class SimpleAddressManager implements AddressManager {
|
|||
|
||||
Binding binding = getBinding(address);
|
||||
|
||||
if (binding == null || !(binding instanceof LocalQueueBinding)
|
||||
|| !binding.getAddress().equals(address)) {
|
||||
if (binding == null || !(binding instanceof LocalQueueBinding) || !binding.getAddress().equals(address)) {
|
||||
Bindings bindings = mappings.get(address);
|
||||
if (bindings != null) {
|
||||
for (Binding theBinding : bindings.getBindings()) {
|
||||
|
@ -151,7 +157,9 @@ public class SimpleAddressManager implements AddressManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getMatchingQueue(final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception {
|
||||
public SimpleString getMatchingQueue(final SimpleString address,
|
||||
final SimpleString queueName,
|
||||
RoutingType routingType) throws Exception {
|
||||
Binding binding = getBinding(queueName);
|
||||
|
||||
if (binding != null && !binding.getAddress().equals(address) && !address.toString().isEmpty()) {
|
||||
|
@ -225,23 +233,80 @@ public class SimpleAddressManager implements AddressManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addAddressInfo(AddressInfo addressInfo) {
|
||||
public boolean reloadAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||
return addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo) == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddressInfo updateAddressInfo(SimpleString addressName,
|
||||
Collection<RoutingType> routingTypes) {
|
||||
if (routingTypes == null || routingTypes.isEmpty()) {
|
||||
return this.addressInfoMap.get(addressName);
|
||||
} else {
|
||||
return this.addressInfoMap.computeIfPresent(addressName, (name, oldAddressInfo) -> {
|
||||
validateRoutingTypes(name, routingTypes);
|
||||
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
|
||||
oldAddressInfo.setRoutingTypes(updatedRoutingTypes);
|
||||
return oldAddressInfo;
|
||||
});
|
||||
public boolean addAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||
boolean added = reloadAddressInfo(addressInfo);
|
||||
if (added && storageManager != null) {
|
||||
long txID = storageManager.generateID();
|
||||
try {
|
||||
storageManager.addAddressBinding(txID, addressInfo);
|
||||
storageManager.commitBindings(txID);
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
storageManager.rollbackBindings(txID);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return added;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddressInfo updateAddressInfo(SimpleString addressName,
|
||||
Collection<RoutingType> routingTypes) throws Exception {
|
||||
|
||||
AddressInfo info = addressInfoMap.get(addressName);
|
||||
|
||||
if (info == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
|
||||
}
|
||||
|
||||
if (routingTypes == null || isEquals(routingTypes, info.getRoutingTypes())) {
|
||||
// there are no changes.. we just give up now
|
||||
return info;
|
||||
}
|
||||
|
||||
validateRoutingTypes(addressName, routingTypes);
|
||||
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
|
||||
info.setRoutingTypes(updatedRoutingTypes);
|
||||
|
||||
|
||||
if (storageManager != null) {
|
||||
//it change the address info without any lock!
|
||||
final long txID = storageManager.generateID();
|
||||
try {
|
||||
storageManager.deleteAddressBinding(txID, info.getId());
|
||||
storageManager.addAddressBinding(txID, info);
|
||||
storageManager.commitBindings(txID);
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
storageManager.rollbackBindings(txID);
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
private boolean isEquals(Collection<RoutingType> set1, Collection<RoutingType> set2) {
|
||||
Set<RoutingType> eset1 = set1 == null || set1.isEmpty() ? Collections.emptySet() : EnumSet.copyOf(set1);
|
||||
Set<RoutingType> eset2 = set2 == null || set2.isEmpty() ? Collections.emptySet() : EnumSet.copyOf(set2);
|
||||
|
||||
if (eset1.size() == 0 && eset2.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (eset1.size() != eset2.size()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return eset2.containsAll(eset1);
|
||||
}
|
||||
|
||||
private void validateRoutingTypes(SimpleString addressName, Collection<RoutingType> routingTypes) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.Address;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
|
@ -42,12 +43,13 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
|||
|
||||
private final Map<SimpleString, Address> wildCardAddresses = new ConcurrentHashMap<>();
|
||||
|
||||
public WildcardAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration) {
|
||||
super(bindingsFactory, wildcardConfiguration);
|
||||
public WildcardAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration, final
|
||||
StorageManager storageManager) {
|
||||
super(bindingsFactory, wildcardConfiguration, storageManager);
|
||||
}
|
||||
|
||||
public WildcardAddressManager(final BindingsFactory bindingsFactory) {
|
||||
super(bindingsFactory);
|
||||
public WildcardAddressManager(final BindingsFactory bindingsFactory, StorageManager storageManager) {
|
||||
super(bindingsFactory, storageManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -43,7 +43,7 @@ public interface QueueFactory {
|
|||
SimpleString user,
|
||||
boolean durable,
|
||||
boolean temporary,
|
||||
boolean autoCreated);
|
||||
boolean autoCreated) throws Exception;
|
||||
|
||||
/**
|
||||
* This is required for delete-all-reference to work correctly with paging
|
||||
|
|
|
@ -2633,16 +2633,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
//after the postOffice call, updatedAddressInfo could change further (concurrently)!
|
||||
final AddressInfo updatedAddressInfo = postOffice.updateAddressInfo(address, routingTypes);
|
||||
//it change the address info without any lock!
|
||||
final long txID = storageManager.generateID();
|
||||
try {
|
||||
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
|
||||
storageManager.addAddressBinding(txID, updatedAddressInfo);
|
||||
} finally {
|
||||
storageManager.commitBindings(txID);
|
||||
}
|
||||
|
||||
postOffice.updateAddressInfo(address, routingTypes);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2650,13 +2641,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
public boolean addAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||
boolean result = postOffice.addAddressInfo(addressInfo);
|
||||
|
||||
if (result) {
|
||||
long txID = storageManager.generateID();
|
||||
storageManager.addAddressBinding(txID, addressInfo);
|
||||
storageManager.commitBindings(txID);
|
||||
} else {
|
||||
result = false;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -173,12 +173,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
@Override
|
||||
public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
|
||||
List<AddressBindingInfo> addressBindingInfos) throws Exception {
|
||||
|
||||
for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
|
||||
addressBindingInfosMap.put(addressBindingInfo.getId(), addressBindingInfo);
|
||||
|
||||
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes());
|
||||
addressInfo.setId(addressBindingInfo.getId());
|
||||
postOffice.addAddressInfo(addressInfo);
|
||||
postOffice.reloadAddressInfo(addressInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -94,7 +94,7 @@ public class QueueFactoryImpl implements QueueFactory {
|
|||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated) {
|
||||
final boolean autoCreated) throws Exception {
|
||||
|
||||
// Add default address info if one doesn't exist
|
||||
postOffice.addAddressInfo(new AddressInfo(address));
|
||||
|
|
|
@ -1715,6 +1715,19 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
return recordsType;
|
||||
}
|
||||
|
||||
protected HashMap<Integer, AtomicInteger> countBindingJournal(Configuration config) throws Exception {
|
||||
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<>();
|
||||
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getBindingsLocation(), null, 1);
|
||||
|
||||
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-bindings", "bindings", 1);
|
||||
List<JournalFile> filesToRead = messagesJournal.orderFiles();
|
||||
|
||||
for (JournalFile file : filesToRead) {
|
||||
JournalImpl.readJournalFile(messagesFF, file, new RecordTypeCounter(recordsType));
|
||||
}
|
||||
return recordsType;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will load a journal and count the living records
|
||||
*
|
||||
|
|
|
@ -22,10 +22,17 @@ import javax.jms.MessageConsumer;
|
|||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
|
@ -41,11 +48,9 @@ public class UpdateQueueTest extends ActiveMQTestBase {
|
|||
|
||||
server.start();
|
||||
|
||||
|
||||
SimpleString ADDRESS = SimpleString.toSimpleString("queue.0");
|
||||
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null,
|
||||
null, true, false);
|
||||
long originalID = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, null, true, false).getID();
|
||||
|
||||
Connection conn = factory.createConnection();
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -61,9 +66,10 @@ public class UpdateQueueTest extends ActiveMQTestBase {
|
|||
factory.close();
|
||||
|
||||
server.stop();
|
||||
|
||||
server.start();
|
||||
|
||||
validateBindingRecords(server, JournalRecordIds.QUEUE_BINDING_RECORD, 2);
|
||||
|
||||
Queue queue = server.locateQueue(ADDRESS);
|
||||
|
||||
Assert.assertNotNull("queue not found", queue);
|
||||
|
@ -86,7 +92,59 @@ public class UpdateQueueTest extends ActiveMQTestBase {
|
|||
|
||||
conn.close();
|
||||
|
||||
Assert.assertEquals(originalID, server.locateQueue(ADDRESS).getID());
|
||||
|
||||
// stopping, restarting to make sure the system will not create an extra record without an udpate
|
||||
server.stop();
|
||||
server.start();
|
||||
validateBindingRecords(server, JournalRecordIds.QUEUE_BINDING_RECORD, 2);
|
||||
server.stop();
|
||||
|
||||
}
|
||||
|
||||
private void validateBindingRecords(ActiveMQServer server, byte type, int expected) throws Exception {
|
||||
HashMap<Integer, AtomicInteger> counts = countBindingJournal(server.getConfiguration());
|
||||
// if this fails, don't ignore it.. it means something is sending a new record on the journal
|
||||
// something is creating new records upon restart of the server.
|
||||
// I really meant to have this fix, so don't ignore it if it fails
|
||||
Assert.assertEquals(expected, counts.get((int) type).intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateAddress() throws Exception {
|
||||
ActiveMQServer server = createServer(true, true);
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
|
||||
server.start();
|
||||
|
||||
SimpleString ADDRESS = SimpleString.toSimpleString("queue.0");
|
||||
|
||||
AddressInfo infoAdded = new AddressInfo(ADDRESS, RoutingType.ANYCAST);
|
||||
|
||||
server.addAddressInfo(infoAdded);
|
||||
|
||||
server.updateAddressInfo(ADDRESS, infoAdded.getRoutingTypes());
|
||||
|
||||
server.stop();
|
||||
server.start();
|
||||
|
||||
AddressInfo infoAfterRestart = server.getPostOffice().getAddressInfo(ADDRESS);
|
||||
|
||||
Assert.assertEquals(infoAdded.getId(), infoAfterRestart.getId());
|
||||
|
||||
Set<RoutingType> completeSet = new HashSet<>();
|
||||
completeSet.add(RoutingType.ANYCAST);
|
||||
completeSet.add(RoutingType.MULTICAST);
|
||||
|
||||
server.updateAddressInfo(ADDRESS, completeSet);
|
||||
|
||||
server.stop();
|
||||
server.start();
|
||||
|
||||
infoAfterRestart = server.getPostOffice().getAddressInfo(ADDRESS);
|
||||
|
||||
// it was changed.. so new ID
|
||||
Assert.assertNotEquals(infoAdded.getId(), infoAfterRestart.getId());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
|
|||
@Test
|
||||
public void testUnitOnWildCardFailingScenario() throws Exception {
|
||||
int errors = 0;
|
||||
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake());
|
||||
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null);
|
||||
ad.addBinding(new BindingFake("Topic1", "Topic1"));
|
||||
ad.addBinding(new BindingFake("Topic1", "one"));
|
||||
ad.addBinding(new BindingFake("*", "two"));
|
||||
|
@ -72,7 +72,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
|
|||
@Test
|
||||
public void testUnitOnWildCardFailingScenarioFQQN() throws Exception {
|
||||
int errors = 0;
|
||||
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake());
|
||||
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null);
|
||||
ad.addBinding(new BindingFake("Topic1", "Topic1"));
|
||||
ad.addBinding(new BindingFake("Topic1", "one"));
|
||||
ad.addBinding(new BindingFake("*", "two"));
|
||||
|
|
Loading…
Reference in New Issue