final fix for https://issues.apache.org/activemq/browse/AMQ-2303 - durable subsciber recovery

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@790113 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-07-01 10:58:44 +00:00
parent 4ee34c9609
commit 101c35b8e8
6 changed files with 46 additions and 6 deletions

View File

@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.amq.AMQTx;
@ -103,6 +104,11 @@ public interface ReferenceStoreAdapter extends PersistenceAdapter {
* @param maxDataFileLength
*/
void setMaxDataFileLength(long maxDataFileLength);
/**
* Recover particular subscription. Used for recovery of durable consumers
* @param info
* @throws IOException
*/
void recoverSubscription(SubscriptionInfo info) throws IOException;
}

View File

@ -44,6 +44,7 @@ import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
@ -586,6 +587,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
} else {
switch (c.getDataStructureType()) {
case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
}
break;
case JournalQueueAck.DATA_STRUCTURE_TYPE: {
JournalQueueAck command = (JournalQueueAck)c;
AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());

View File

@ -68,6 +68,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
peristenceAdapter.writeCommand(subscriptionInfo, false);
topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
}

View File

@ -28,12 +28,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
@ -274,6 +274,13 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
ts.addSubsciption(info, false);
}
}
public void recoverSubscription(SubscriptionInfo info) throws IOException {
TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
LOG.info("Recovering subscriber state for durable subscriber: " + info);
ts.addSubsciption(info, false);
}
public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException {
Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>();

View File

@ -174,7 +174,7 @@ public class DurableConsumerTest extends TestCase {
Thread publisherThread = new Thread( new MessagePublisher() );
publisherThread.start();
for( int i = 0; i < 100; i++ ) {
for( int i = 0; i < 200; i++ ) {
final int id = i;
Thread thread = new Thread( new Runnable() {
@ -280,9 +280,20 @@ public class DurableConsumerTest extends TestCase {
assertTrue(exceptions.isEmpty());
}
public void testConsumer() throws Exception{
public void testConsumerRecover() throws Exception {
doTestConsumer(true);
}
public void testConsumer() throws Exception {
doTestConsumer(false);
}
public void doTestConsumer(boolean forceRecover) throws Exception{
broker.start();
if (forceRecover) {
configurePersistence(broker);
}
broker.start();
factory = createConnectionFactory();
Connection consumerConnection = factory.createConnection();
@ -294,6 +305,9 @@ public class DurableConsumerTest extends TestCase {
consumerConnection.close();
broker.stop();
broker = createBroker(false);
if (forceRecover) {
configurePersistence(broker);
}
broker.start();
Connection producerConnection = factory.createConnection();
@ -313,6 +327,9 @@ public class DurableConsumerTest extends TestCase {
producerConnection.close();
broker.stop();
broker = createBroker(false);
if (forceRecover) {
configurePersistence(broker);
}
broker.start();
consumerConnection = factory.createConnection();

View File

@ -28,6 +28,7 @@ import javax.persistence.Query;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
@ -161,4 +162,7 @@ public class JPAReferenceStoreAdapter extends JPAPersistenceAdapter implements R
public void setMaxDataFileLength(long maxDataFileLength) {
}
public void recoverSubscription(SubscriptionInfo info) throws IOException {
}
}