Setting the Store based cursor as the default for Durable Subscribers

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@480731 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-11-29 22:09:33 +00:00
parent 789d4e5541
commit 4597dddf79
13 changed files with 182 additions and 70 deletions

View File

@ -47,6 +47,7 @@ import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
@ -140,7 +141,8 @@ public class BrokerService implements Service, Serializable {
private ActiveMQDestination[] destinations;
private Store tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
//private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new StorePendingDurableSubscriberMessageStoragePolicy();
/**
@ -383,6 +385,7 @@ public class BrokerService implements Service, Serializable {
startDestinations();
addShutdownHook();
log.info("Using Persistence Adaptor " + getPersistenceAdapter());
if (deleteAllMessagesOnStartup) {
deleteAllMessages();
}

View File

@ -64,6 +64,9 @@ public class DurableTopicSubscription extends PrefetchSubscription {
if( active || keepDurableSubsActive ) {
Topic topic = (Topic) destination;
topic.activate(context, this);
if (pending.isEmpty(topic)) {
topic.recoverRetroactiveMessages(context, this);
}
}
dispatchMatched();
}
@ -82,6 +85,13 @@ public class DurableTopicSubscription extends PrefetchSubscription {
synchronized(pending) {
pending.start();
}
//If nothing was in the persistent store, then try to use the recovery policy.
if (pending.isEmpty()) {
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
topic.recoverRetroactiveMessages(context, this);
}
}
dispatchMatched();
}
}

View File

@ -146,7 +146,6 @@ public class Topic implements Destination {
}
public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
// synchronize with dispatch method so that no new messages are sent
// while
// we are recovering a subscription to avoid out of order messages.
@ -210,15 +209,7 @@ public class Topic implements Destination {
});
}
if( true && subscription.getConsumerInfo().isRetroactive() ) {
// If nothing was in the persistent store, then try to use the recovery policy.
if( subscription.getEnqueueCounter() == 0 ) {
subscriptionRecoveryPolicy.recover(context, this, subscription);
} else {
// TODO: implement something like
// subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
}
}
}
finally {
@ -231,7 +222,15 @@ public class Topic implements Destination {
consumers.remove(sub);
}
sub.remove(context, this);
}
}
protected void recoverRetroactiveMessages(ConnectionContext context,Subscription subscription) throws Exception{
if(subscription.getConsumerInfo().isRetroactive()){
subscriptionRecoveryPolicy.recover(context,this,subscription);
}
}
public void send(final ConnectionContext context, final Message message) throws Exception {

View File

@ -66,6 +66,10 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{
public boolean isEmpty(){
return false;
}
public boolean isEmpty(Destination destination) {
return isEmpty();
}
public MessageReference next(){
return null;

View File

@ -49,6 +49,13 @@ public interface PendingMessageCursor extends Service{
*/
public boolean isEmpty();
/**
* check if a Destination is Empty for this cursor
* @param destination
* @return true id the Destination is empty
*/
public boolean isEmpty(Destination destination);
/**
* reset the cursor
*

View File

@ -19,6 +19,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
@ -85,14 +86,16 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
* @throws Exception
*/
public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
tsp.setMaxBatchSize(getMaxBatchSize());
tsp.setUsageManager(usageManager);
topics.put(destination,tsp);
storePrefetches.add(tsp);
if(started){
tsp.start();
pendingCount+=tsp.size();
if(destination!=null&&!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())){
TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
tsp.setMaxBatchSize(getMaxBatchSize());
tsp.setUsageManager(usageManager);
topics.put(destination,tsp);
storePrefetches.add(tsp);
if(started){
tsp.start();
pendingCount+=tsp.size();
}
}
}
@ -116,6 +119,15 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
public synchronized boolean isEmpty(){
return pendingCount<=0;
}
public boolean isEmpty(Destination destination) {
boolean result = true;
TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(destination);
if(tsp!=null){
result = tsp.size() <= 0;
}
return result;
}
/**
* Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber

View File

@ -88,4 +88,6 @@ public interface JDBCAdapter{
public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,
JDBCMessageRecoveryListener listener) throws Exception;
public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination destination,String clientId, String subscriberName) throws SQLException,IOException;
}

View File

@ -99,7 +99,8 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
AtomicLong last=(AtomicLong)subscriberLastMessageMap.get(subcriberId);
if(last==null){
last=new AtomicLong(-1);
long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c,destination,clientId,subscriptionName);
last=new AtomicLong(lastAcked);
subscriberLastMessageMap.put(subcriberId,last);
}
final AtomicLong finalLast=last;

View File

@ -64,7 +64,7 @@ public class Statements {
private String lockUpdateStatement;
private String nextDurableSubscriberMessageStatement;
private String durableSubscriberMessageCountStatement;
private String nextDurableSubscriberMessageIdStatement;
private String lastAckedDurableSubscriberMessageStatement;
private String destinationMessageCountStatement;
private String findNextMessagesStatement;
private boolean useLockCreateWhereClause;
@ -322,6 +322,18 @@ public class Statements {
}
return findNextMessagesStatement;
}
/**
* @return the lastAckedDurableSubscriberMessageStatement
*/
public String getLastAckedDurableSubscriberMessageStatement(){
if(lastAckedDurableSubscriberMessageStatement==null) {
lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName()
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return lastAckedDurableSubscriberMessageStatement;
}
public String getFullMessageTableName() {
@ -590,20 +602,7 @@ public class Statements {
*/
public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){
this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement;
}
/**
* @param nextDurableSubscriberMessageIdStatement the nextDurableSubscriberMessageIdStatement to set
*/
public void setNextDurableSubscriberMessageIdStatement(String nextDurableSubscriberMessageIdStatement){
this.nextDurableSubscriberMessageIdStatement=nextDurableSubscriberMessageIdStatement;
}
}
/**
* @param findNextMessagesStatement the findNextMessagesStatement to set
@ -618,6 +617,16 @@ public class Statements {
public void setDestinationMessageCountStatement(String destinationMessageCountStatement){
this.destinationMessageCountStatement=destinationMessageCountStatement;
}
/**
* @param lastAckedDurableSubscriberMessageStatement the lastAckedDurableSubscriberMessageStatement to set
*/
public void setLastAckedDurableSubscriberMessageStatement(String lastAckedDurableSubscriberMessageStatement){
this.lastAckedDurableSubscriberMessageStatement=lastAckedDurableSubscriberMessageStatement;
}

View File

@ -544,6 +544,28 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
close(s);
}
}
public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination destination,String clientId, String subscriberName) throws SQLException,IOException{
PreparedStatement s=null;
ResultSet rs=null;
long result = -1;
try{
s=c.getConnection().prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement());
s.setString(1,destination.getQualifiedName());
s.setString(2,clientId);
s.setString(3,subscriberName);
rs=s.executeQuery();
if(rs.next()){
result=rs.getLong(1);
}
rs.close();
s.close();
}finally{
close(rs);
close(s);
}
return result;
}
static private void close(PreparedStatement s){
try{

View File

@ -112,6 +112,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info);
}
//add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
if(retroactive){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
@ -124,24 +125,9 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
}
}
public synchronized void deleteSubscription(String clientId,String subscriptionName){
public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
}
removeSubscriberMessageContainer(key);
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
@ -224,6 +210,26 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
subscriberMessages.put(key,tsc);
return container;
}
protected void removeSubscriberMessageContainer(Object key) throws IOException {
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
}
store.deleteListContainer(key,"topic-subs");
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);

View File

@ -132,24 +132,9 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
}
}
public synchronized void deleteSubscription(String clientId,String subscriptionName){
public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
}
removeSubscriberMessageContainer(key);
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
@ -221,6 +206,26 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
subscriberMessages.put(key,tsc);
return container;
}
protected void removeSubscriberMessageContainer(Object key) throws IOException {
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
}
store.deleteListContainer(key,"topic-subs");
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);

View File

@ -0,0 +1,32 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
/**
* @version $Revision: 1.1.1.1 $
*/
public class KahaDurableSubscriptionTest extends DurableSubscriptionTestSupport{
protected PersistenceAdapter createPersistenceAdapter() throws IOException{
File dataDir=new File("target/test-data/durableKaha");
KahaPersistenceAdapter adaptor=new KahaPersistenceAdapter(dataDir);
return adaptor;
}
}