mirror of https://github.com/apache/activemq.git
Fix for DurableConsumerCloseAndReconnectTest,
- The eager loading of durable subs had broken the test a little.. fixed it up now. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@376861 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
30b2e67b91
commit
a71a2a5dc0
|
@ -149,7 +149,7 @@ public class Topic implements Destination {
|
|||
|
||||
boolean persistenceWasOptimized = canOptimizeOutPersistence();
|
||||
if (initialActivation) {
|
||||
synchronized(consumers) {
|
||||
synchronized(consumers) {
|
||||
consumers.add(sub);
|
||||
durableSubscriberCounter.incrementAndGet();
|
||||
}
|
||||
|
@ -169,7 +169,7 @@ public class Topic implements Destination {
|
|||
info = null;
|
||||
}
|
||||
}
|
||||
// Do we need to crate the subscription?
|
||||
// Do we need to create the subscription?
|
||||
if (info == null) {
|
||||
store.addSubsciption(clientId, subscriptionName, selector, sub.getConsumerInfo().isRetroactive());
|
||||
}
|
||||
|
@ -222,9 +222,6 @@ public class Topic implements Destination {
|
|||
destinationStatistics.getConsumers().decrement();
|
||||
synchronized(consumers) {
|
||||
consumers.remove(sub);
|
||||
if( sub.getConsumerInfo().isDurable() ) {
|
||||
durableSubscriberCounter.decrementAndGet();
|
||||
}
|
||||
}
|
||||
sub.remove(context, this);
|
||||
}
|
||||
|
@ -265,9 +262,14 @@ public class Topic implements Destination {
|
|||
return durableSubscriberCounter.get()==0;
|
||||
}
|
||||
|
||||
public void createSubscription(SubscriptionKey key) {
|
||||
durableSubscriberCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
|
||||
if (store != null) {
|
||||
store.deleteSubscription(key.clientId, key.subscriptionName);
|
||||
durableSubscriberCounter.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -59,6 +59,13 @@ public class TopicRegion extends AbstractRegion {
|
|||
|
||||
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable {
|
||||
if (info.isDurable()) {
|
||||
|
||||
ActiveMQDestination destination = info.getDestination();
|
||||
if( !destination.isPattern() ) {
|
||||
// Make sure the destination is created.
|
||||
lookup(context, destination);
|
||||
}
|
||||
|
||||
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
|
||||
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
|
||||
if (sub != null) {
|
||||
|
@ -148,7 +155,7 @@ public class TopicRegion extends AbstractRegion {
|
|||
SubscriptionInfo[] infos = store.getAllSubscriptions();
|
||||
for (int i = 0; i < infos.length; i++) {
|
||||
log.info("Restoring durable subscription: "+infos[i]);
|
||||
createDurableSubscription(infos[i]);
|
||||
createDurableSubscription(topic, infos[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -182,10 +189,12 @@ public class TopicRegion extends AbstractRegion {
|
|||
}
|
||||
}
|
||||
|
||||
public Subscription createDurableSubscription(SubscriptionInfo info) throws JMSException {
|
||||
public Subscription createDurableSubscription(Topic topic, SubscriptionInfo info) throws Throwable {
|
||||
SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
|
||||
topic.createSubscription(key);
|
||||
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
|
||||
sub = new DurableTopicSubscription(broker,info);
|
||||
sub.add(null, topic);
|
||||
durableSubscriptions.put(key, sub);
|
||||
return sub;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue