diff --git a/activemq-core/project.xml b/activemq-core/project.xml
index 6dc151282b..495bfeda62 100755
--- a/activemq-core/project.xml
+++ b/activemq-core/project.xml
@@ -347,6 +347,9 @@
**/ProxyConnectorTest.*
+
+ **/FanoutTransportBrokerTest.*
+
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
index 546d07ec8b..b7f382d388 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
@@ -50,8 +50,8 @@ public class ManagedQueueRegion extends QueueRegion {
super.destroySubscription(sub);
}
- protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
- Destination rc = super.createDestination(destination);
+ protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
+ Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
index 6099d09fda..2d0596d1d8 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
@@ -48,8 +48,8 @@ public class ManagedTempQueueRegion extends TempQueueRegion {
super.destroySubscription(sub);
}
- protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
- Destination rc = super.createDestination(destination);
+ protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
+ Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
index e0fe85eff0..4f7d712ce7 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
@@ -48,8 +48,8 @@ public class ManagedTempTopicRegion extends TempTopicRegion {
super.destroySubscription(sub);
}
- protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
- Destination rc = super.createDestination(destination);
+ protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
+ Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
index 56783acb81..500b93a3d2 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
@@ -50,8 +50,8 @@ public class ManagedTopicRegion extends TopicRegion {
super.destroySubscription(sub);
}
- protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
- Destination rc = super.createDestination(destination);
+ protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
+ Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 3459d00d80..5065ad204b 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -73,7 +73,7 @@ abstract public class AbstractRegion implements Region {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
log.debug("Adding destination: "+destination);
- Destination dest = createDestination(destination);
+ Destination dest = createDestination(context, destination);
dest.start();
synchronized(destinationsMutex){
destinations.put(destination,dest);
@@ -241,7 +241,7 @@ abstract public class AbstractRegion implements Region {
}
protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Throwable;
- abstract protected Destination createDestination(ActiveMQDestination destination) throws Throwable;
+ abstract protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable;
public boolean isAutoCreateDestinations() {
return autoCreateDestinations;
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index e3603abbc7..bc1ea84c2c 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -27,76 +27,66 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.util.SubscriptionKey;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class DurableTopicSubscription extends PrefetchSubscription {
- final protected String clientId;
- final protected String subscriptionName;
- final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
-
- boolean active=true;
- boolean recovered=true;
+ private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
+ private final ConcurrentHashMap destinations = new ConcurrentHashMap();
+ private final SubscriptionKey subscriptionKey;
+ private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info);
- this.clientId = context.getClientId();
- this.subscriptionName = info.getSubcriptionName();
+ subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
}
- public DurableTopicSubscription(Broker broker,SubscriptionInfo info) throws InvalidSelectorException {
- super(broker,null, createFakeConsumerInfo(info));
- this.clientId = info.getClientId();
- this.subscriptionName = info.getSubcriptionName();
- active=false;
- recovered=false;
- }
-
- private static ConsumerInfo createFakeConsumerInfo(SubscriptionInfo info) {
- ConsumerInfo rc = new ConsumerInfo();
- rc.setSelector(info.getSelector());
- rc.setSubcriptionName(info.getSubcriptionName());
- rc.setDestination(info.getDestination());
- return rc;
- }
-
synchronized public boolean isActive() {
return active;
}
- synchronized public boolean isRecovered() {
- return recovered;
- }
protected boolean isFull() {
return !active || super.isFull();
}
synchronized public void gc() {
- if( !active && recovered ) {
- recovered = false;
-
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference) iter.next();
- // node.decrementTargetCount();
- iter.remove();
+ }
+
+ synchronized public void add(ConnectionContext context, Destination destination) throws Throwable {
+ super.add(context, destination);
+ destinations.put(destination.getActiveMQDestination(), destination);
+ if( active ) {
+ Topic topic = (Topic) destination;
+ topic.activate(context, this);
+ }
+ }
+
+ synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Throwable {
+ if( !active ) {
+ this.active = true;
+ this.context = context;
+ this.info = info;
+ for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
+ Topic topic = (Topic) iter.next();
+ topic.activate(context, this);
}
-
- for (Iterator iter = matched.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference) iter.next();
- // node.decrementTargetCount();
- iter.remove();
+ if( !isFull() ) {
+ dispatchMatched();
}
-
- delivered=0;
}
}
- synchronized public void deactivate() {
+ synchronized public void deactivate() throws Throwable {
active=false;
+ for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
+ Topic topic = (Topic) iter.next();
+ topic.deactivate(context, this);
+ }
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-
+
+ // Mark the dispatched messages as redelivered for next time.
MessageReference node = (MessageReference) iter.next();
Integer count = (Integer) redeliveredMessages.get(node.getMessageId());
if( count !=null ) {
@@ -105,32 +95,16 @@ public class DurableTopicSubscription extends PrefetchSubscription {
redeliveredMessages.put(node.getMessageId(), new Integer(1));
}
- // Undo the dispatch.
- matched.addFirst(node);
iter.remove();
}
+ for (Iterator iter = matched.iterator(); iter.hasNext();) {
+ MessageReference node = (MessageReference) iter.next();
+ // node.decrementTargetCount();
+ iter.remove();
+ }
delivered=0;
}
- synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Throwable {
- if( !active ) {
- this.active = true;
- this.context = context;
- this.info = info;
- if( !recovered ) {
- recovered=true;
- for (Iterator iter = destinations.iterator(); iter.hasNext();) {
- Topic topic = (Topic) iter.next();
- topic.recover(context, this, false);
- }
- } else {
- if( !isFull() ) {
- dispatchMatched();
- }
- }
- }
- }
-
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
Integer count = (Integer) redeliveredMessages.get(node.getMessageId());
@@ -141,7 +115,9 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
synchronized public void add(MessageReference node) throws Throwable {
- assert recovered;
+ if( !active ) {
+ return;
+ }
node = new IndirectMessageReference(node.getRegionDestination(), (Message) node);
super.add(node);
node.decrementReferenceCount();
@@ -152,7 +128,6 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
public synchronized void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable {
- assert recovered;
super.acknowledge(context, ack);
}
@@ -163,7 +138,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
public String getSubscriptionName() {
- return subscriptionName;
+ return subscriptionKey.getSubscriptionName();
}
public String toString() {
@@ -177,7 +152,11 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
public String getClientId() {
- return clientId;
+ return subscriptionKey.getClientId();
+ }
+
+ public SubscriptionKey getSubscriptionKey() {
+ return subscriptionKey;
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index d36c847107..9ff6b96ed9 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -37,6 +37,7 @@ import java.util.LinkedList;
* @version $Revision: 1.15 $
*/
abstract public class PrefetchSubscription extends AbstractSubscription{
+
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
final protected LinkedList matched=new LinkedList();
final protected LinkedList dispatched=new LinkedList();
@@ -44,13 +45,17 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
int preLoadLimit=1024*100;
int preLoadSize=0;
boolean dispatching=false;
-
+
+ long enqueueCounter;
+ long dispatchCounter;
+
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
throws InvalidSelectorException{
super(broker,context,info);
}
synchronized public void add(MessageReference node) throws Throwable{
+ enqueueCounter++;
if(!isFull()&&!isSlaveBroker()){
dispatch(node);
}else{
@@ -244,8 +249,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}
// Make sure we can dispatch a message.
if(canDispatch(node)&&!isSlaveBroker()){
+ dispatchCounter++;
MessageDispatch md=createMessageDispatch(node,message);
- dispatched.addLast(node);
+ dispatched.addLast(node);
incrementPreloadSize(node.getMessage().getSize());
if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){
@@ -325,4 +331,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
*/
protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
throws IOException{}
+
+
+ public long getDispatchCounter() {
+ return dispatchCounter;
+ }
+
+ public long getEnqueueCounter() {
+ return enqueueCounter;
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
index ce0cd583ce..c61328a615 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
@@ -54,7 +54,7 @@ public class QueueRegion extends AbstractRegion {
// Implementation methods
// -------------------------------------------------------------------------
- protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
+ protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
index b9247046b7..f111c0d1e4 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
@@ -38,7 +38,7 @@ public class TempQueueRegion extends AbstractRegion {
setAutoCreateDestinations(false);
}
- protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
+ protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
index 894b22c2a9..5be51470ea 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
@@ -37,7 +37,7 @@ public class TempTopicRegion extends AbstractRegion {
setAutoCreateDestinations(false);
}
- protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
+ protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
index cb21ebdf47..01a2a7bcde 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -42,8 +42,8 @@ import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* The Topic is a destination that sends a copy of a message to every active
@@ -64,7 +64,7 @@ public class Topic implements Destination {
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedCountSubscriptionRecoveryPolicy();
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
- private AtomicInteger durableSubscriberCounter = new AtomicInteger();
+ private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap();
public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
@@ -72,15 +72,6 @@ public class Topic implements Destination {
this.destination = destination;
this.store = store;
this.usageManager = memoryManager;
-
- // TODO: switch back when cache is working again.
- // this.cache = cache;
- // destinationStatistics.setMessagesCached(cache.getMessagesCached());
- // CacheEvictionUsageListener listener = new
- // CacheEvictionUsageListener(memoryManager, 90, 50, taskFactory);
- // listener.add(cache);
- // this.memoryManager.addUsageListener(listener);
-
this.destinationStatistics.setParent(parentStats);
}
@@ -89,142 +80,147 @@ public class Topic implements Destination {
}
public void addSubscription(ConnectionContext context, final Subscription sub) throws Throwable {
- destinationStatistics.getConsumers().increment();
+
sub.add(context, this);
- if (sub.getConsumerInfo().isDurable()) {
- recover(context, (DurableTopicSubscription) sub, true);
- }
- else {
- recover(context, sub);
- }
- }
- /**
- * Used to recover the message list non durable subscriptions. Recovery only happens if the consumer is
- * retroactive.
- *
- * @param context
- * @param sub
- * @throws Throwable
- */
- private void recover(ConnectionContext context, final Subscription sub) throws Throwable {
- if (sub.getConsumerInfo().isRetroactive()) {
+ if ( !sub.getConsumerInfo().isDurable() ) {
- // synchronize with dispatch method so that no new messages are sent
- // while we are recovering a subscription to avoid out of order messages.
- dispatchValve.turnOff();
- try {
+ destinationStatistics.getConsumers().increment();
+
+ // Do a retroactive recovery if needed.
+ if (sub.getConsumerInfo().isRetroactive()) {
+ // synchronize with dispatch method so that no new messages are sent
+ // while we are recovering a subscription to avoid out of order messages.
+ dispatchValve.turnOff();
+ try {
+
+ synchronized(consumers) {
+ consumers.add(sub);
+ }
+ subscriptionRecoveryPolicy.recover(context, this, sub);
+
+ } finally {
+ dispatchValve.turnOn();
+ }
+
+ } else {
synchronized(consumers) {
consumers.add(sub);
}
- subscriptionRecoveryPolicy.recover(context, this, sub);
-
- } finally {
- dispatchValve.turnOn();
- }
-
+ }
} else {
- synchronized(consumers) {
- consumers.add(sub);
- }
+ DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
+ durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
}
}
-
- /**
- * Used to recover the message list for a durable subscription.
- *
- * @param context
- * @param sub
- * @param initialActivation
- * @throws Throwable
- */
- public void recover(ConnectionContext context, final DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
-
+
+ public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable {
+ if ( !sub.getConsumerInfo().isDurable() ) {
+ destinationStatistics.getConsumers().decrement();
+ synchronized(consumers) {
+ consumers.remove(sub);
+ }
+ }
+ sub.remove(context, this);
+ }
+
+ public void addInactiveSubscription(ConnectionContext context, DurableTopicSubscription sub) throws Throwable {
+ sub.add(context, this);
+ destinationStatistics.getConsumers().increment();
+ durableSubcribers.put(sub.getSubscriptionKey(), sub);
+ }
+
+ public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
+ if (store != null) {
+ store.deleteSubscription(key.clientId, key.subscriptionName);
+ durableSubcribers.remove(key);
+ destinationStatistics.getConsumers().decrement();
+ }
+ }
+
+ public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Throwable {
+
// synchronize with dispatch method so that no new messages are sent
// while
// we are recovering a subscription to avoid out of order messages.
dispatchValve.turnOff();
try {
-
- boolean persistenceWasOptimized = canOptimizeOutPersistence();
- if (initialActivation) {
- synchronized(consumers) {
- consumers.add(sub);
- durableSubscriberCounter.incrementAndGet();
+
+ synchronized(consumers) {
+ consumers.add(subscription);
+ }
+
+ if (store == null )
+ return;
+
+ // Recover the durable subscription.
+ String clientId = subscription.getClientId();
+ String subscriptionName = subscription.getSubscriptionName();
+ String selector = subscription.getConsumerInfo().getSelector();
+ SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName);
+ if (info != null) {
+ // Check to see if selector changed.
+ String s1 = info.getSelector();
+ if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
+ // Need to delete the subscription
+ store.deleteSubscription(clientId, subscriptionName);
+ info = null;
}
}
-
- if (store != null) {
- String clientId = sub.getClientId();
- String subscriptionName = sub.getSubscriptionName();
- String selector = sub.getConsumerInfo().getSelector();
- SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName);
- if (info != null) {
- // Check to see if selector changed.
- String s1 = info.getSelector();
- if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
- // Need to delete the subscription
- store.deleteSubscription(clientId, subscriptionName);
- info = null;
- }
- }
- // Do we need to create the subscription?
- if (info == null) {
- store.addSubsciption(clientId, subscriptionName, selector, sub.getConsumerInfo().isRetroactive());
- }
-
- if (sub.isRecovered()) {
- final MessageEvaluationContext msgContext = new MessageEvaluationContext();
- msgContext.setDestination(destination);
- store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
- public void recoverMessage(Message message) throws Throwable {
- message.setRegionDestination(Topic.this);
- try {
- msgContext.setMessageReference(message);
- if (sub.matches(message, msgContext)) {
- sub.add(message);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- catch (IOException e) {
- // TODO: Need to handle this better.
- e.printStackTrace();
- }
- }
-
- public void recoverMessageReference(String messageReference) throws Throwable {
- throw new RuntimeException("Should not be called.");
- }
- });
-
- if( initialActivation && sub.getConsumerInfo().isRetroactive() ) {
- // Then use the subscriptionRecoveryPolicy since there will not be any messages in the persistent store.
- if( persistenceWasOptimized ) {
- subscriptionRecoveryPolicy.recover(context, this, sub);
- } else {
- // TODO: implement something like
- // subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
+ // Do we need to create the subscription?
+ if (info == null) {
+ store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive());
+ }
+
+ final MessageEvaluationContext msgContext = new MessageEvaluationContext();
+ msgContext.setDestination(destination);
+ store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
+ public void recoverMessage(Message message) throws Throwable {
+ message.setRegionDestination(Topic.this);
+ try {
+ msgContext.setMessageReference(message);
+ if (subscription.matches(message, msgContext)) {
+ subscription.add(message);
}
}
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ catch (IOException e) {
+ // TODO: Need to handle this better.
+ e.printStackTrace();
+ }
+ }
+
+ public void recoverMessageReference(String messageReference) throws Throwable {
+ throw new RuntimeException("Should not be called.");
+ }
+ });
+
+ if( true && subscription.getConsumerInfo().isRetroactive() ) {
+ // If nothing was in the persistent store, then try to use the recovery policy.
+ if( subscription.getEnqueueCounter() == 0 ) {
+ subscriptionRecoveryPolicy.recover(context, this, subscription);
+ } else {
+ // TODO: implement something like
+ // subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
}
}
-
+
}
finally {
dispatchValve.turnOn();
}
}
- public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable {
- destinationStatistics.getConsumers().decrement();
- synchronized(consumers) {
+ public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Throwable {
+ synchronized(consumers) {
consumers.remove(sub);
}
sub.remove(context, this);
- }
+ }
+
public void send(final ConnectionContext context, final Message message) throws Throwable {
@@ -259,18 +255,7 @@ public class Topic implements Destination {
}
private boolean canOptimizeOutPersistence() {
- return durableSubscriberCounter.get()==0;
- }
-
- public void createSubscription(SubscriptionKey key) {
- durableSubscriberCounter.incrementAndGet();
- }
-
- public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
- if (store != null) {
- store.deleteSubscription(key.clientId, key.subscriptionName);
- durableSubscriberCounter.decrementAndGet();
- }
+ return durableSubcribers.size()==0;
}
public String toString() {
@@ -424,4 +409,5 @@ public class Topic implements Destination {
}
}
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index f16b39545e..9b41f123e6 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -86,19 +86,21 @@ public class TopicRegion extends AbstractRegion {
super.removeConsumer(context, sub.getConsumerInfo());
super.addConsumer(context, info);
-
+ sub = (DurableTopicSubscription) durableSubscriptions.get(key);
}
else {
// Change the consumer id key of the durable sub.
if( sub.getConsumerInfo().getConsumerId()!=null )
subscriptions.remove(sub.getConsumerInfo().getConsumerId());
subscriptions.put(info.getConsumerId(), sub);
- sub.activate(context, info);
}
}
else {
super.addConsumer(context, info);
+ sub = (DurableTopicSubscription) durableSubscriptions.get(key);
}
+
+ sub.activate(context, info);
}
else {
super.addConsumer(context, info);
@@ -145,7 +147,7 @@ public class TopicRegion extends AbstractRegion {
// Implementation methods
// -------------------------------------------------------------------------
- protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
+ protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
TopicMessageStore store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
@@ -154,13 +156,30 @@ public class TopicRegion extends AbstractRegion {
if (store != null) {
SubscriptionInfo[] infos = store.getAllSubscriptions();
for (int i = 0; i < infos.length; i++) {
- log.info("Restoring durable subscription: "+infos[i]);
- createDurableSubscription(topic, infos[i]);
+
+ SubscriptionInfo info = infos[i];
+ log.debug("Restoring durable subscription: "+infos);
+ SubscriptionKey key = new SubscriptionKey(info);
+
+ // A single durable sub may be subscribing to multiple topics. so it might exist already.
+ DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
+ if( sub == null ) {
+ sub = (DurableTopicSubscription) createSubscription(context, createInactiveConsumerInfo(info));
+ }
+ topic.addInactiveSubscription(context, sub);
}
}
return topic;
}
+
+ private static ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
+ ConsumerInfo rc = new ConsumerInfo();
+ rc.setSelector(info.getSelector());
+ rc.setSubcriptionName(info.getSubcriptionName());
+ rc.setDestination(info.getDestination());
+ return rc;
+ }
protected void configureTopic(Topic topic, ActiveMQDestination destination) {
if (policyMap != null) {
@@ -188,17 +207,6 @@ public class TopicRegion extends AbstractRegion {
return new TopicSubscription(broker,context, info, memoryManager);
}
}
-
- public Subscription createDurableSubscription(Topic topic, SubscriptionInfo info) throws Throwable {
- SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
- topic.createSubscription(key);
- DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
- sub = new DurableTopicSubscription(broker,info);
- sub.add(null, topic);
- durableSubscriptions.put(key, sub);
- return sub;
- }
-
/**
*/
diff --git a/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java b/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
index fa6d4e1908..cbd4622664 100755
--- a/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
+++ b/activemq-core/src/main/java/org/apache/activemq/util/SubscriptionKey.java
@@ -16,17 +16,26 @@
*/
package org.apache.activemq.util;
+import org.apache.activemq.command.SubscriptionInfo;
+
public class SubscriptionKey {
+
public final String clientId;
public final String subscriptionName;
private final int hashValue;
+
+ public SubscriptionKey(SubscriptionInfo info) {
+ this(info.getClientId(), info.getSubcriptionName());
+ }
+
public SubscriptionKey(String clientId, String subscriptionName) {
this.clientId = clientId;
this.subscriptionName = subscriptionName;
hashValue = clientId.hashCode()^subscriptionName.hashCode();
}
-
+
+
public int hashCode() {
return hashValue;
}
@@ -43,4 +52,12 @@ public class SubscriptionKey {
public String toString() {
return clientId+":"+subscriptionName;
}
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getSubscriptionName() {
+ return subscriptionName;
+ }
}
\ No newline at end of file
diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
index 5ce541ec50..695ee998dc 100755
--- a/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
@@ -185,7 +185,10 @@ public class JmsRedeliveredTest extends TestCase {
Topic topic = session.createTopic("topic-"+getName());
MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
- MessageProducer producer = createProducer(session, topic);
+ // This case only works with persistent messages since transient messages
+ // are dropped when the consumer goes offline.
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(createTextMessage(session));
// Consume the message...