diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java index edf54038cf..06575be689 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.amq.AMQTx; @@ -103,6 +104,11 @@ public interface ReferenceStoreAdapter extends PersistenceAdapter { * @param maxDataFileLength */ void setMaxDataFileLength(long maxDataFileLength); - - + + /** + * Recover particular subscription. Used for recovery of durable consumers + * @param info + * @throws IOException + */ + void recoverSubscription(SubscriptionInfo info) throws IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index fd0294eacb..b962c64b5d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -44,6 +44,7 @@ import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.async.Location; @@ -586,6 +587,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, } } else { switch (c.getDataStructureType()) { + case SubscriptionInfo.DATA_STRUCTURE_TYPE: { + referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c); + } + break; case JournalQueueAck.DATA_STRUCTURE_TYPE: { JournalQueueAck command = (JournalQueueAck)c; AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination()); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java index b76aa3d5e8..48d4923663 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java @@ -68,6 +68,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag } public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { + peristenceAdapter.writeCommand(subscriptionInfo, false); topicReferenceStore.addSubsciption(subscriptionInfo, retroactive); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java index d35780ca93..b1525643b3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java @@ -28,12 +28,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.kaha.CommandMarshaller; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; @@ -274,6 +274,13 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ts.addSubsciption(info, false); } } + + public void recoverSubscription(SubscriptionInfo info) throws IOException { + TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination()); + LOG.info("Recovering subscriber state for durable subscriber: " + info); + ts.addSubsciption(info, false); + } + public Map retrievePreparedState() throws IOException { Map result = new HashMap(); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java index 24aa78bb03..d06641bcef 100755 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java @@ -174,7 +174,7 @@ public class DurableConsumerTest extends TestCase { Thread publisherThread = new Thread( new MessagePublisher() ); publisherThread.start(); - for( int i = 0; i < 100; i++ ) { + for( int i = 0; i < 200; i++ ) { final int id = i; Thread thread = new Thread( new Runnable() { @@ -280,9 +280,20 @@ public class DurableConsumerTest extends TestCase { assertTrue(exceptions.isEmpty()); } - public void testConsumer() throws Exception{ + public void testConsumerRecover() throws Exception { + doTestConsumer(true); + } + + public void testConsumer() throws Exception { + doTestConsumer(false); + } + + public void doTestConsumer(boolean forceRecover) throws Exception{ - broker.start(); + if (forceRecover) { + configurePersistence(broker); + } + broker.start(); factory = createConnectionFactory(); Connection consumerConnection = factory.createConnection(); @@ -294,6 +305,9 @@ public class DurableConsumerTest extends TestCase { consumerConnection.close(); broker.stop(); broker = createBroker(false); + if (forceRecover) { + configurePersistence(broker); + } broker.start(); Connection producerConnection = factory.createConnection(); @@ -313,6 +327,9 @@ public class DurableConsumerTest extends TestCase { producerConnection.close(); broker.stop(); broker = createBroker(false); + if (forceRecover) { + configurePersistence(broker); + } broker.start(); consumerConnection = factory.createConnection(); diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java index ff47d66bf3..36357421fe 100644 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java +++ b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java @@ -28,6 +28,7 @@ import javax.persistence.Query; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.ReferenceStore; @@ -161,4 +162,7 @@ public class JPAReferenceStoreAdapter extends JPAPersistenceAdapter implements R public void setMaxDataFileLength(long maxDataFileLength) { } + + public void recoverSubscription(SubscriptionInfo info) throws IOException { + } }