mirror of https://github.com/apache/activemq.git
Apply patch that allows for durable subs to migrate when links are stolen.
This commit is contained in:
parent
61a3eab8ab
commit
164e303e21
|
@ -119,8 +119,10 @@ public class TopicRegion extends AbstractRegion {
|
|||
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
|
||||
DurableTopicSubscription sub = durableSubscriptions.get(key);
|
||||
if (sub != null) {
|
||||
if (sub.isActive()) {
|
||||
throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
|
||||
// throw this exception only if link stealing is off
|
||||
if (!context.isAllowLinkStealing() && sub.isActive()) {
|
||||
throw new JMSException("Durable consumer is in use for client: " + clientId +
|
||||
" and subscriptionName: " + subscriptionName);
|
||||
}
|
||||
// Has the selector changed??
|
||||
if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
|
||||
|
@ -146,14 +148,24 @@ public class TopicRegion extends AbstractRegion {
|
|||
if (sub.getConsumerInfo().getConsumerId() != null) {
|
||||
subscriptions.remove(sub.getConsumerInfo().getConsumerId());
|
||||
}
|
||||
// set the info and context to the new ones.
|
||||
// this is set in the activate() call below, but
|
||||
// that call is a NOP if it is already active.
|
||||
// hence need to set here and deactivate it first
|
||||
if ((sub.context != context) || (sub.info != info)) {
|
||||
sub.info = info;
|
||||
sub.context = context;
|
||||
sub.deactivate(keepDurableSubsActive);
|
||||
}
|
||||
subscriptions.put(info.getConsumerId(), sub);
|
||||
}
|
||||
} else {
|
||||
super.addConsumer(context, info);
|
||||
sub = durableSubscriptions.get(key);
|
||||
if (sub == null) {
|
||||
throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
|
||||
+ " subscriberName: " + key.getSubscriptionName());
|
||||
throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() +
|
||||
" for two different durable subscriptions clientID: " + key.getClientId() +
|
||||
" subscriberName: " + key.getSubscriptionName());
|
||||
}
|
||||
}
|
||||
sub.activate(usageManager, context, info, broker);
|
||||
|
@ -166,13 +178,15 @@ public class TopicRegion extends AbstractRegion {
|
|||
@Override
|
||||
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||
if (info.isDurable()) {
|
||||
|
||||
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
|
||||
DurableTopicSubscription sub = durableSubscriptions.get(key);
|
||||
if (sub != null) {
|
||||
sub.deactivate(keepDurableSubsActive);
|
||||
// deactivate only if given context is same
|
||||
// as what is in the sub. otherwise, during linksteal
|
||||
// sub will get new context, but will be removed here
|
||||
if (sub.getContext() == context)
|
||||
sub.deactivate(keepDurableSubsActive);
|
||||
}
|
||||
|
||||
} else {
|
||||
super.removeConsumer(context, info);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue