This commit is contained in:
Clebert Suconic 2017-08-30 22:59:31 -04:00
commit 65220a86c3
12 changed files with 220 additions and 91 deletions

View File

@ -62,9 +62,13 @@ public interface AddressManager {
* @param addressInfo
* @return true if the address was added, false if it wasn't added
*/
boolean addAddressInfo(AddressInfo addressInfo);
boolean addAddressInfo(AddressInfo addressInfo) throws Exception;
AddressInfo updateAddressInfo(SimpleString addressName, Collection<RoutingType> routingTypes);
boolean reloadAddressInfo(AddressInfo addressInfo) throws Exception;
/** it will return null if there are no updates.
* it will throw an exception if the address doesn't exist */
AddressInfo updateAddressInfo(SimpleString addressName, Collection<RoutingType> routingTypes) throws Exception;
AddressInfo removeAddressInfo(SimpleString address);

View File

@ -49,7 +49,11 @@ public interface PostOffice extends ActiveMQComponent {
* @param addressInfo
* @return true if the address was added, false if it wasn't added
*/
boolean addAddressInfo(AddressInfo addressInfo);
boolean addAddressInfo(AddressInfo addressInfo) throws Exception;
default void reloadAddressInfo(AddressInfo addressInfo) throws Exception {
addAddressInfo(addressInfo);
}
AddressInfo removeAddressInfo(SimpleString address) throws Exception;

View File

@ -158,9 +158,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
this.reaperPriority = reaperPriority;
if (wildcardConfiguration.isEnabled()) {
addressManager = new WildcardAddressManager(this, wildcardConfiguration);
addressManager = new WildcardAddressManager(this, wildcardConfiguration, storageManager);
} else {
addressManager = new SimpleAddressManager(this, wildcardConfiguration);
addressManager = new SimpleAddressManager(this, wildcardConfiguration, storageManager);
}
this.idCacheSize = idCacheSize;
@ -422,9 +422,23 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// PostOffice implementation -----------------------------------------------
@Override
public boolean addAddressInfo(AddressInfo addressInfo) {
public void reloadAddressInfo(AddressInfo addressInfo) throws Exception {
internalAddressInfo(addressInfo, true);
}
@Override
public boolean addAddressInfo(AddressInfo addressInfo) throws Exception {
return internalAddressInfo(addressInfo, false);
}
private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) throws Exception {
synchronized (addressLock) {
boolean result = addressManager.addAddressInfo(addressInfo);
boolean result;
if (reload) {
result = addressManager.reloadAddressInfo(addressInfo);
} else {
result = addressManager.addAddressInfo(addressInfo);
}
// only register address if it is new
if (result) {
try {
@ -437,24 +451,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
/** used on update queue, to validate when a value needs update */
private static int replaceNull(Integer value) {
if (value == null) {
return -1;
} else {
return value.intValue();
}
}
/** used on update queue, to validate when a value needs update */
private static boolean replaceNull(Boolean value) {
if (value == null) {
return false;
} else {
return value.booleanValue();
}
}
@Override
public QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
@ -468,13 +464,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
final Queue queue = queueBinding.getQueue();
if (queue.getRoutingType() == routingType && replaceNull(maxConsumers) == replaceNull(queue.getMaxConsumers()) && queue.isPurgeOnNoConsumers() == replaceNull(purgeOnNoConsumers)) {
if (logger.isTraceEnabled()) {
logger.tracef("Queue " + name + " didn't need to be updated");
}
return queueBinding;
}
boolean changed = false;
//validate update
if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
@ -493,24 +483,29 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
//atomic update
if (maxConsumers != null) {
if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) {
changed = true;
queue.setMaxConsumer(maxConsumers);
}
if (routingType != null) {
if (routingType != null && queue.getRoutingType() != routingType) {
changed = true;
queue.setRoutingType(routingType);
}
if (purgeOnNoConsumers != null) {
if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) {
changed = true;
queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
}
final long txID = storageManager.generateID();
try {
storageManager.updateQueueBinding(txID, queueBinding);
storageManager.commitBindings(txID);
} catch (Throwable throwable) {
storageManager.rollback(txID);
logger.warn(throwable.getMessage(), throwable);
throw throwable;
if (changed) {
final long txID = storageManager.generateID();
try {
storageManager.updateQueueBinding(txID, queueBinding);
storageManager.commitBindings(txID);
} catch (Throwable throwable) {
storageManager.rollback(txID);
logger.warn(throwable.getMessage(), throwable);
throw throwable;
}
}
return queueBinding;
@ -520,9 +515,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) throws Exception {
synchronized (addressLock) {
return addressManager.updateAddressInfo(addressName, routingTypes);
}
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
@ -26,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.AddressManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -48,6 +50,8 @@ public class SimpleAddressManager implements AddressManager {
private final ConcurrentMap<SimpleString, AddressInfo> addressInfoMap = new ConcurrentHashMap<>();
private final StorageManager storageManager;
/**
* HashMap<Address, Binding>
*/
@ -62,13 +66,16 @@ public class SimpleAddressManager implements AddressManager {
protected final WildcardConfiguration wildcardConfiguration;
public SimpleAddressManager(final BindingsFactory bindingsFactory) {
this(bindingsFactory, new WildcardConfiguration());
public SimpleAddressManager(final BindingsFactory bindingsFactory, final StorageManager storageManager) {
this(bindingsFactory, new WildcardConfiguration(), storageManager);
}
public SimpleAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration) {
public SimpleAddressManager(final BindingsFactory bindingsFactory,
final WildcardConfiguration wildcardConfiguration,
final StorageManager storageManager) {
this.wildcardConfiguration = wildcardConfiguration;
this.bindingsFactory = bindingsFactory;
this.storageManager = storageManager;
}
@Override
@ -134,8 +141,7 @@ public class SimpleAddressManager implements AddressManager {
Binding binding = getBinding(address);
if (binding == null || !(binding instanceof LocalQueueBinding)
|| !binding.getAddress().equals(address)) {
if (binding == null || !(binding instanceof LocalQueueBinding) || !binding.getAddress().equals(address)) {
Bindings bindings = mappings.get(address);
if (bindings != null) {
for (Binding theBinding : bindings.getBindings()) {
@ -151,7 +157,9 @@ public class SimpleAddressManager implements AddressManager {
}
@Override
public SimpleString getMatchingQueue(final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception {
public SimpleString getMatchingQueue(final SimpleString address,
final SimpleString queueName,
RoutingType routingType) throws Exception {
Binding binding = getBinding(queueName);
if (binding != null && !binding.getAddress().equals(address) && !address.toString().isEmpty()) {
@ -225,23 +233,80 @@ public class SimpleAddressManager implements AddressManager {
}
@Override
public boolean addAddressInfo(AddressInfo addressInfo) {
public boolean reloadAddressInfo(AddressInfo addressInfo) throws Exception {
return addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo) == null;
}
@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) {
if (routingTypes == null || routingTypes.isEmpty()) {
return this.addressInfoMap.get(addressName);
} else {
return this.addressInfoMap.computeIfPresent(addressName, (name, oldAddressInfo) -> {
validateRoutingTypes(name, routingTypes);
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
oldAddressInfo.setRoutingTypes(updatedRoutingTypes);
return oldAddressInfo;
});
public boolean addAddressInfo(AddressInfo addressInfo) throws Exception {
boolean added = reloadAddressInfo(addressInfo);
if (added && storageManager != null) {
long txID = storageManager.generateID();
try {
storageManager.addAddressBinding(txID, addressInfo);
storageManager.commitBindings(txID);
} catch (Exception e) {
try {
storageManager.rollbackBindings(txID);
} catch (Exception ignored) {
}
throw e;
}
}
return added;
}
@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) throws Exception {
AddressInfo info = addressInfoMap.get(addressName);
if (info == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
}
if (routingTypes == null || isEquals(routingTypes, info.getRoutingTypes())) {
// there are no changes.. we just give up now
return info;
}
validateRoutingTypes(addressName, routingTypes);
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
info.setRoutingTypes(updatedRoutingTypes);
if (storageManager != null) {
//it change the address info without any lock!
final long txID = storageManager.generateID();
try {
storageManager.deleteAddressBinding(txID, info.getId());
storageManager.addAddressBinding(txID, info);
storageManager.commitBindings(txID);
} catch (Exception e) {
try {
storageManager.rollbackBindings(txID);
} catch (Throwable ignored) {
}
throw e;
}
}
return info;
}
private boolean isEquals(Collection<RoutingType> set1, Collection<RoutingType> set2) {
Set<RoutingType> eset1 = set1 == null || set1.isEmpty() ? Collections.emptySet() : EnumSet.copyOf(set1);
Set<RoutingType> eset2 = set2 == null || set2.isEmpty() ? Collections.emptySet() : EnumSet.copyOf(set2);
if (eset1.size() == 0 && eset2.size() == 0) {
return true;
}
if (eset1.size() != eset2.size()) {
return false;
}
return eset2.containsAll(eset1);
}
private void validateRoutingTypes(SimpleString addressName, Collection<RoutingType> routingTypes) {

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -42,12 +43,13 @@ public class WildcardAddressManager extends SimpleAddressManager {
private final Map<SimpleString, Address> wildCardAddresses = new ConcurrentHashMap<>();
public WildcardAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration) {
super(bindingsFactory, wildcardConfiguration);
public WildcardAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration, final
StorageManager storageManager) {
super(bindingsFactory, wildcardConfiguration, storageManager);
}
public WildcardAddressManager(final BindingsFactory bindingsFactory) {
super(bindingsFactory);
public WildcardAddressManager(final BindingsFactory bindingsFactory, StorageManager storageManager) {
super(bindingsFactory, storageManager);
}
@Override

View File

@ -43,7 +43,7 @@ public interface QueueFactory {
SimpleString user,
boolean durable,
boolean temporary,
boolean autoCreated);
boolean autoCreated) throws Exception;
/**
* This is required for delete-all-reference to work correctly with paging

View File

@ -2633,16 +2633,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
//after the postOffice call, updatedAddressInfo could change further (concurrently)!
final AddressInfo updatedAddressInfo = postOffice.updateAddressInfo(address, routingTypes);
//it change the address info without any lock!
final long txID = storageManager.generateID();
try {
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
storageManager.addAddressBinding(txID, updatedAddressInfo);
} finally {
storageManager.commitBindings(txID);
}
postOffice.updateAddressInfo(address, routingTypes);
return true;
}
@ -2650,13 +2641,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public boolean addAddressInfo(AddressInfo addressInfo) throws Exception {
boolean result = postOffice.addAddressInfo(addressInfo);
if (result) {
long txID = storageManager.generateID();
storageManager.addAddressBinding(txID, addressInfo);
storageManager.commitBindings(txID);
} else {
result = false;
}
return result;
}

View File

@ -173,12 +173,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
@Override
public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap,
List<AddressBindingInfo> addressBindingInfos) throws Exception {
for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
addressBindingInfosMap.put(addressBindingInfo.getId(), addressBindingInfo);
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes());
addressInfo.setId(addressBindingInfo.getId());
postOffice.addAddressInfo(addressInfo);
postOffice.reloadAddressInfo(addressInfo);
}
}

View File

@ -94,7 +94,7 @@ public class QueueFactoryImpl implements QueueFactory {
final SimpleString user,
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
final boolean autoCreated) throws Exception {
// Add default address info if one doesn't exist
postOffice.addAddressInfo(new AddressInfo(address));

View File

@ -1715,6 +1715,19 @@ public abstract class ActiveMQTestBase extends Assert {
return recordsType;
}
protected HashMap<Integer, AtomicInteger> countBindingJournal(Configuration config) throws Exception {
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<>();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getBindingsLocation(), null, 1);
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-bindings", "bindings", 1);
List<JournalFile> filesToRead = messagesJournal.orderFiles();
for (JournalFile file : filesToRead) {
JournalImpl.readJournalFile(messagesFF, file, new RecordTypeCounter(recordsType));
}
return recordsType;
}
/**
* This method will load a journal and count the living records
*

View File

@ -22,10 +22,17 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
@ -41,11 +48,9 @@ public class UpdateQueueTest extends ActiveMQTestBase {
server.start();
SimpleString ADDRESS = SimpleString.toSimpleString("queue.0");
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null,
null, true, false);
long originalID = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, null, true, false).getID();
Connection conn = factory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -61,9 +66,10 @@ public class UpdateQueueTest extends ActiveMQTestBase {
factory.close();
server.stop();
server.start();
validateBindingRecords(server, JournalRecordIds.QUEUE_BINDING_RECORD, 2);
Queue queue = server.locateQueue(ADDRESS);
Assert.assertNotNull("queue not found", queue);
@ -86,7 +92,59 @@ public class UpdateQueueTest extends ActiveMQTestBase {
conn.close();
Assert.assertEquals(originalID, server.locateQueue(ADDRESS).getID());
// stopping, restarting to make sure the system will not create an extra record without an udpate
server.stop();
server.start();
validateBindingRecords(server, JournalRecordIds.QUEUE_BINDING_RECORD, 2);
server.stop();
}
private void validateBindingRecords(ActiveMQServer server, byte type, int expected) throws Exception {
HashMap<Integer, AtomicInteger> counts = countBindingJournal(server.getConfiguration());
// if this fails, don't ignore it.. it means something is sending a new record on the journal
// something is creating new records upon restart of the server.
// I really meant to have this fix, so don't ignore it if it fails
Assert.assertEquals(expected, counts.get((int) type).intValue());
}
@Test
public void testUpdateAddress() throws Exception {
ActiveMQServer server = createServer(true, true);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
server.start();
SimpleString ADDRESS = SimpleString.toSimpleString("queue.0");
AddressInfo infoAdded = new AddressInfo(ADDRESS, RoutingType.ANYCAST);
server.addAddressInfo(infoAdded);
server.updateAddressInfo(ADDRESS, infoAdded.getRoutingTypes());
server.stop();
server.start();
AddressInfo infoAfterRestart = server.getPostOffice().getAddressInfo(ADDRESS);
Assert.assertEquals(infoAdded.getId(), infoAfterRestart.getId());
Set<RoutingType> completeSet = new HashSet<>();
completeSet.add(RoutingType.ANYCAST);
completeSet.add(RoutingType.MULTICAST);
server.updateAddressInfo(ADDRESS, completeSet);
server.stop();
server.start();
infoAfterRestart = server.getPostOffice().getAddressInfo(ADDRESS);
// it was changed.. so new ID
Assert.assertNotEquals(infoAdded.getId(), infoAfterRestart.getId());
}
}

View File

@ -42,7 +42,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
@Test
public void testUnitOnWildCardFailingScenario() throws Exception {
int errors = 0;
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake());
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null);
ad.addBinding(new BindingFake("Topic1", "Topic1"));
ad.addBinding(new BindingFake("Topic1", "one"));
ad.addBinding(new BindingFake("*", "two"));
@ -72,7 +72,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
@Test
public void testUnitOnWildCardFailingScenarioFQQN() throws Exception {
int errors = 0;
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake());
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null);
ad.addBinding(new BindingFake("Topic1", "Topic1"));
ad.addBinding(new BindingFake("Topic1", "one"));
ad.addBinding(new BindingFake("*", "two"));