git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1359949 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-07-10 23:11:36 +00:00
parent d162fa07b7
commit 4ae6807cc2
2 changed files with 41 additions and 13 deletions

View File

@ -222,7 +222,9 @@ public class Topic extends BaseDestination implements Task {
info = null;
} else {
synchronized (consumers) {
consumers.add(subscription);
if (!consumers.contains(subscription)) {
consumers.add(subscription);
}
}
}
}

View File

@ -27,7 +27,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
public class DurableSubscriptionActivationTest extends org.apache.activemq.TestSupport {
@ -47,7 +47,7 @@ public class DurableSubscriptionActivationTest extends org.apache.activemq.TestS
protected void setUp() throws Exception {
topic = (ActiveMQTopic) createDestination();
createBroker();
createBroker(true);
super.setUp();
}
@ -58,16 +58,18 @@ public class DurableSubscriptionActivationTest extends org.apache.activemq.TestS
protected void restartBroker() throws Exception {
destroyBroker();
createBroker();
createBroker(false);
}
private void createBroker() throws Exception {
private void createBroker(boolean delete) throws Exception {
broker = BrokerFactory.createBroker("broker:(vm://localhost)");
broker.setKeepDurableSubsActive(true);
broker.setPersistent(true);
AMQPersistenceAdapter persistenceAdapter = new AMQPersistenceAdapter();
persistenceAdapter.setDirectory(new File("activemq-data/" + getName()));
broker.setPersistenceAdapter(persistenceAdapter);
broker.setDeleteAllMessagesOnStartup(delete);
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
kahadb.setDirectory(new File("activemq-data/" + getName() + "-kahadb"));
kahadb.setJournalMaxFileLength(500 * 1024);
broker.setPersistenceAdapter(kahadb);
broker.setBrokerName(getName());
// only if we pre-create the destinations
@ -85,14 +87,14 @@ public class DurableSubscriptionActivationTest extends org.apache.activemq.TestS
broker.stop();
}
public void testActivateWithExistingTopic() throws Exception {
public void testActivateWithExistingTopic1() throws Exception {
// create durable subscription
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
Destination d = broker.getDestination(topic);
assertTrue("More than one consumer.", d.getConsumers().size() == 1);
assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
// restart the broker
restartBroker();
@ -100,7 +102,7 @@ public class DurableSubscriptionActivationTest extends org.apache.activemq.TestS
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
assertTrue("More than one consumer.", d.getConsumers().size() == 1);
assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
// re-activate
connection.close();
@ -108,6 +110,30 @@ public class DurableSubscriptionActivationTest extends org.apache.activemq.TestS
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
assertTrue("More than one consumer.", d.getConsumers().size() == 1);
assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
}
public void testActivateWithExistingTopic2() throws Exception {
// create durable subscription
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
// restart the broker
restartBroker();
// activate
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
Destination d = broker.getDestination(topic);
assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
// re-activate
connection.close();
connection = createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
assertTrue("More than one consumer found: " + d.getConsumers().size(), d.getConsumers().size() == 1);
}
}