This closes #292

This commit is contained in:
Clebert Suconic 2016-01-05 10:44:13 -05:00
commit cdd800d2b7
2 changed files with 58 additions and 23 deletions

View File

@ -31,8 +31,6 @@ import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
/**
* A simple address manager that maintains the addresses and bindings.
@ -49,8 +47,6 @@ public class SimpleAddressManager implements AddressManager {
*/
private final ConcurrentMap<SimpleString, Binding> nameMap = new ConcurrentHashMap<>();
private final ConcurrentHashSet<SimpleString> pendingDeletes = new ConcurrentHashSet<>();
private final BindingsFactory bindingsFactory;
public SimpleAddressManager(final BindingsFactory bindingsFactory) {
@ -59,7 +55,7 @@ public class SimpleAddressManager implements AddressManager {
@Override
public boolean addBinding(final Binding binding) throws Exception {
if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null || pendingDeletes.contains(binding.getUniqueName())) {
if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null) {
throw ActiveMQMessageBundle.BUNDLE.bindingAlreadyExists(binding);
}
@ -78,24 +74,6 @@ public class SimpleAddressManager implements AddressManager {
return null;
}
if (tx != null) {
pendingDeletes.add(uniqueName);
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
pendingDeletes.remove(uniqueName);
}
@Override
public void afterRollback(Transaction tx) {
nameMap.put(uniqueName, binding);
pendingDeletes.remove(uniqueName);
}
});
}
removeBindingInternal(binding.getAddress(), uniqueName);
return binding;

View File

@ -28,6 +28,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import java.util.List;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
@ -107,6 +108,62 @@ public class DurableSubscriptionTest extends JMSTestCase {
}
}
// https://issues.apache.org/jira/browse/ARTEMIS-177
@Test
public void testDurableSubscriptionRemovalRaceCondition() throws Exception {
final String topicName = "myTopic";
final String clientID = "myClientID";
final String subscriptionName = "mySub";
createTopic(topicName);
InitialContext ic = getInitialContext();
Topic myTopic = (Topic) ic.lookup("/topic/" + topicName);
Connection conn = null;
for (int i = 0; i < 1000; i++) {
try {
conn = createConnection();
conn.setClientID(clientID);
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = s.createProducer(myTopic);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
s.createDurableSubscriber(myTopic, subscriptionName);
prod.send(s.createTextMessage("k"));
conn.close();
destroyTopic(topicName);
createTopic(topicName);
conn = createConnection();
conn.setClientID(clientID);
s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer durable = s.createDurableSubscriber(myTopic, subscriptionName);
conn.start();
TextMessage tm = (TextMessage) durable.receiveNoWait();
ProxyAssertSupport.assertNull(tm);
durable.close();
s.unsubscribe(subscriptionName);
}
finally {
if (conn != null) {
conn.close();
}
}
}
}
/**
* JMS 1.1 6.11.1: A client can change an existing durable subscription by creating a durable
* TopicSubscriber with the same name and a new topic and/or message selector, or NoLocal