git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@442550 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-09-12 10:07:34 +00:00
parent 72cc9e9aa0
commit 4eef609524
26 changed files with 1036 additions and 124 deletions

View File

@ -155,4 +155,8 @@ abstract public class AbstractSubscription implements Subscription {
public int getPrefetchSize() {
return info.getPrefetchSize();
}
public boolean isRecoveryRequired(){
return true;
}
}

View File

@ -24,6 +24,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -40,7 +41,8 @@ public class DurableTopicSubscription extends PrefetchSubscription {
private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
//super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
//super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName()));
// super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
super(broker,context,info);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
@ -78,12 +80,14 @@ public class DurableTopicSubscription extends PrefetchSubscription {
topic.activate(context, this);
}
}
pending.start();
dispatchMatched();
}
}
synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
active=false;
pending.stop();
if( !keepDurableSubsActive ) {
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();

View File

@ -188,7 +188,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(context.isInTransaction()) {
// extend prefetch window only if not a pulling consumer
if (getPrefetchSize() != 0) {
prefetchExtension=Math.max(prefetchExtension,index+1);
prefetchExtension=Math.max(prefetchExtension,index+1);
}
}
else {
@ -316,6 +316,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
return enqueueCounter;
}
public boolean isRecoveryRequired(){
return pending.isRecoveryRequired();
}
/**
* optimize message consumer prefetch if the consumer supports it
*
@ -336,7 +340,16 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
*/
}
public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context,destination);
pending.add(context,destination);
}
public void remove(ConnectionContext context, Destination destination) throws Exception {
super.remove(context,destination);
pending.remove(context,destination);
}
protected void dispatchMatched() throws IOException{

View File

@ -190,5 +190,13 @@ public interface Subscription {
* @return the prefetch size that is configured for the subscription
*/
int getPrefetchSize();
/**
* Informs the Broker if the subscription needs to intervention to recover it's state
* e.g. DurableTopicSubscriber may do
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired();
}

View File

@ -180,31 +180,30 @@ public class Topic implements Destination {
final MessageEvaluationContext msgContext = new MessageEvaluationContext();
msgContext.setDestination(destination);
store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
public void recoverMessage(Message message) throws Exception {
message.setRegionDestination(Topic.this);
try {
msgContext.setMessageReference(message);
if (subscription.matches(message, msgContext)) {
subscription.add(message);
if(subscription.isRecoveryRequired()){
store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){
public void recoverMessage(Message message) throws Exception{
message.setRegionDestination(Topic.this);
try{
msgContext.setMessageReference(message);
if(subscription.matches(message,msgContext)){
subscription.add(message);
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}catch(IOException e){
// TODO: Need to handle this better.
e.printStackTrace();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
public void recoverMessageReference(String messageReference) throws Exception{
throw new RuntimeException("Should not be called.");
}
catch (IOException e) {
// TODO: Need to handle this better.
e.printStackTrace();
}
}
public void recoverMessageReference(String messageReference) throws Exception {
throw new RuntimeException("Should not be called.");
}
public void finished(){
}
});
public void finished(){}
});
}
if( true && subscription.getConsumerInfo().isRetroactive() ) {
// If nothing was in the persistent store, then try to use the recovery policy.

View File

@ -24,6 +24,7 @@ import java.util.Set;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
@ -216,6 +217,9 @@ public class TopicRegion extends AbstractRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
if (info.isDurable()) {
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())){
throw new JMSException("Cannot create a durable subscription for an advisory Topic");
}
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if (sub == null) {

View File

@ -0,0 +1,43 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
/**
* Default method holder for pending message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
public abstract class AbstractPendingMessageCursor implements PendingMessageCursor{
public void start() throws Exception{
}
public void stop() throws Exception{
}
public void add(ConnectionContext context, Destination destination) throws Exception{
}
public void remove(ConnectionContext context, Destination destination) throws Exception{
}
public boolean isRecoveryRequired(){
return true;
}
}

View File

@ -12,12 +12,14 @@
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.*;
import java.util.Iterator;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.*;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.kahadaptor.CommandMarshaller;
/**
@ -25,25 +27,26 @@ import org.apache.activemq.store.kahadaptor.CommandMarshaller;
*
* @version $Revision$
*/
public class FilePendingMessageCursor implements PendingMessageCursor{
public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
private ListContainer list;
private Iterator iter = null;
private Iterator iter=null;
private Destination regionDestination;
/**
* @param name
* @param store
* @throws IOException
*/
public FilePendingMessageCursor(String name, Store store) {
public FilePendingMessageCursor(String name,Store store){
try{
list = store.getListContainer(name);
list=store.getListContainer(name);
list.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
list.setMaximumCacheSize(0);
}catch(IOException e){
throw new RuntimeException(e);
}
}
/**
* @return true if there are no pending messages
*/
@ -53,12 +56,12 @@ public class FilePendingMessageCursor implements PendingMessageCursor{
/**
* reset the cursor
*
*
*/
public void reset(){
iter = list.listIterator();
iter=list.listIterator();
}
/**
* add message to await dispatch
*
@ -66,42 +69,42 @@ public class FilePendingMessageCursor implements PendingMessageCursor{
*/
public void addMessageLast(MessageReference node){
try{
regionDestination = node.getMessage().getRegionDestination();
regionDestination=node.getMessage().getRegionDestination();
node.decrementReferenceCount();
}catch(IOException e){
throw new RuntimeException(e);
throw new RuntimeException(e);
}
list.addLast(node);
}
/**
* add message to await dispatch
* @param position
*
* @param position
* @param node
*/
public void addMessageFirst(MessageReference node){
try{
regionDestination = node.getMessage().getRegionDestination();
regionDestination=node.getMessage().getRegionDestination();
node.decrementReferenceCount();
}catch(IOException e){
throw new RuntimeException(e);
throw new RuntimeException(e);
}
list.addFirst(node);
}
/**
* @return true if there pending messages to dispatch
*/
public boolean hasNext(){
return iter.hasNext();
return iter.hasNext();
}
/**
* @return the next pending message
*/
public MessageReference next(){
Message message = (Message) iter.next();
Message message=(Message) iter.next();
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
return message;

View File

@ -13,6 +13,9 @@
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
/**
@ -20,7 +23,24 @@ import org.apache.activemq.broker.region.MessageReference;
*
* @version $Revision$
*/
public interface PendingMessageCursor{
public interface PendingMessageCursor extends Service{
/**
* Add a destination
* @param context
* @param destination
* @throws Exception
*/
public void add(ConnectionContext context, Destination destination) throws Exception;
/**
* remove a destination
* @param context
* @param destination
* @throws Exception
*/
public void remove(ConnectionContext context, Destination destination) throws Exception;
/**
* @return true if there are no pending messages
*/
@ -70,4 +90,12 @@ public interface PendingMessageCursor{
*
*/
public void clear();
/**
* Informs the Broker if the subscription needs to intervention to recover it's state
* e.g. DurableTopicSubscriber may do
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired();
}

View File

@ -0,0 +1,180 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
static private final Log log=LogFactory.getLog(StoreDurableSubscriberCursor.class);
private int pendingCount=0;
private String clientId;
private String subscriberName;
private int maxBatchSize=10;
private LinkedList batchList=new LinkedList();
private Map topics=new HashMap();
private LinkedList storePrefetches=new LinkedList();
private AtomicBoolean started=new AtomicBoolean();
/**
* @param topic
* @param clientId
* @param subscriberName
* @throws IOException
*/
public StoreDurableSubscriberCursor(String clientId,String subscriberName){
this.clientId=clientId;
this.subscriberName=subscriberName;
}
public synchronized void start() throws Exception{
started.set(true);
for(Iterator i=storePrefetches.iterator();i.hasNext();){
TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
tsp.start();
pendingCount+=tsp.size();
}
}
public synchronized void stop() throws Exception{
started.set(false);
for(Iterator i=storePrefetches.iterator();i.hasNext();){
TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
tsp.stop();
}
pendingCount=0;
}
/**
* Add a destination
*
* @param context
* @param destination
* @throws Exception
*/
public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
TopicStorePrefetch tsp=new TopicStorePrefetch((Topic) destination,batchList,clientId,subscriberName);
topics.put(destination,tsp);
storePrefetches.add(tsp);
if(started.get()){
tsp.start();
pendingCount+=tsp.size();
}
}
/**
* remove a destination
*
* @param context
* @param destination
* @throws Exception
*/
public synchronized void remove(ConnectionContext context,Destination destination) throws Exception{
TopicStorePrefetch tsp=(TopicStorePrefetch) topics.remove(destination);
if(tsp!=null){
storePrefetches.remove(tsp);
}
}
/**
* @return true if there are no pending messages
*/
public synchronized boolean isEmpty(){
return pendingCount<=0;
}
/**
* Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
* may do
*
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired(){
return false;
}
public synchronized void addMessageFirst(MessageReference node){
pendingCount++;
}
public synchronized void addMessageLast(MessageReference node){
pendingCount++;
}
public void clear(){
pendingCount=0;
}
public synchronized boolean hasNext(){
return !isEmpty();
}
public synchronized MessageReference next(){
MessageReference result=null;
if(!isEmpty()){
if(batchList.isEmpty()){
try{
fillBatch();
}catch(Exception e){
log.error("Couldn't fill batch from store ",e);
throw new RuntimeException(e);
}
}
if(!batchList.isEmpty()){
result=(MessageReference) batchList.removeFirst();
}
}
return result;
}
public synchronized void remove(){
pendingCount--;
}
public void reset(){
batchList.clear();
}
public int size(){
return pendingCount;
}
private synchronized void fillBatch() throws Exception{
for(Iterator i=storePrefetches.iterator();i.hasNext();){
TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
tsp.fillBatch();
if(batchList.size()>=maxBatchSize){
break;
}
}
// round-robin
Object obj=storePrefetches.removeFirst();
storePrefetches.addLast(obj);
}
}

View File

@ -0,0 +1,156 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.LinkedList;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{
static private final Log log=LogFactory.getLog(TopicStorePrefetch.class);
private Topic topic;
private TopicMessageStore store;
private LinkedList batchList;
private String clientId;
private String subscriberName;
private int pendingCount=0;
private MessageId lastMessageId;
private int maxBatchSize=10;
/**
* @param topic
* @param batchList
* @param clientId
* @param subscriberName
* @throws IOException
*/
public TopicStorePrefetch(Topic topic,LinkedList batchList,String clientId,String subscriberName){
this.topic=topic;
this.store=(TopicMessageStore) topic.getMessageStore();
this.batchList=batchList;
this.clientId=clientId;
this.subscriberName=subscriberName;
}
public void start() throws Exception{
pendingCount=store.getMessageCount(clientId,subscriberName);
System.err.println("Pending count = "+pendingCount);
}
public void stop() throws Exception{
pendingCount=0;
lastMessageId=null;
}
/**
* @return true if there are no pending messages
*/
public boolean isEmpty(){
return pendingCount==0;
}
/**
* Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
* may do
*
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired(){
return false;
}
public synchronized void addMessageFirst(MessageReference node){
pendingCount++;
}
public synchronized void addMessageLast(MessageReference node){
pendingCount++;
}
public void clear(){
pendingCount=0;
lastMessageId=null;
}
public synchronized boolean hasNext(){
return !isEmpty();
}
public synchronized MessageReference next(){
MessageReference result=null;
if(!isEmpty()){
if(batchList.isEmpty()){
try{
fillBatch();
}catch(Exception e){
log.error(topic.getDestination()+" Couldn't fill batch from store ",e);
throw new RuntimeException(e);
}
}
result=(MessageReference) batchList.removeFirst();
}
return result;
}
public synchronized void remove(){
pendingCount--;
}
public void reset(){
batchList.clear();
}
public int size(){
return pendingCount;
}
// MessageRecoveryListener implementation
public void finished(){}
public void recoverMessage(Message message) throws Exception{
batchList.addLast(message);
}
public void recoverMessageReference(String messageReference) throws Exception{
// shouldn't get called
throw new RuntimeException("Not supported");
}
// implementation
protected void fillBatch() throws Exception{
if(pendingCount<=0){
pendingCount=store.getMessageCount(clientId,subscriberName);
}
if(pendingCount>0){
store.recoverNextMessages(clientId,subscriberName,lastMessageId,maxBatchSize,this);
// this will add more messages to the batch list
if(!batchList.isEmpty()){
Message message=(Message) batchList.getLast();
lastMessageId=message.getMessageId();
}
}
}
}

View File

@ -20,7 +20,7 @@ import org.apache.activemq.broker.region.MessageReference;
*
* @version $Revision$
*/
public class VMPendingMessageCursor implements PendingMessageCursor{
public class VMPendingMessageCursor extends AbstractPendingMessageCursor{
private LinkedList list = new LinkedList();
private Iterator iter = null;
/**

View File

@ -82,6 +82,14 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
delegate.recoverSubscription(clientId, subscriptionName, listener);
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
return delegate.getNextMessageToDeliver(clientId,subscriptionName);
}
public ActiveMQDestination getDestination() {
return delegate.getDestination();
}
@ -100,4 +108,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
public void setUsageManager(UsageManager usageManager) {
delegate.setUsageManager(usageManager);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
return delegate.getMessageCount(clientId,subscriberName);
}
}

View File

@ -1,95 +1,134 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
/**
* A MessageStore for durable topic subscriptions
*
*
* @version $Revision: 1.4 $
*/
public interface TopicMessageStore extends MessageStore {
public interface TopicMessageStore extends MessageStore{
/**
* Stores the last acknowledged messgeID for the given subscription
* so that we can recover and commence dispatching messages from the last
* checkpoint
* @param context TODO
* Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching
* messages from the last checkpoint
*
* @param context
* @param clientId
* @param subscriptionName
* @param messageId
* @param subscriptionPersistentId
* @throws IOException
*/
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
throws IOException;
/**
* @param clientId
* @param subscriptionName
* @param sub
* @throws JMSException
* @throws IOException
* @throws JMSException
*/
public void deleteSubscription(String clientId, String subscriptionName) throws IOException;
public void deleteSubscription(String clientId,String subscriptionName) throws IOException;
/**
* For the new subscription find the last acknowledged message ID and then find any new messages since then and
* dispatch them to the subscription. <p/> e.g. if we dispatched some messages to a new durable topic subscriber,
* then went down before acknowledging any messages, we need to know the correct point from which to recover from.
*
* @param clientId
* @param subscriptionName
* @param listener
* @param subscription
*
* @throws Exception
*/
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception;
/**
* For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId
* messageId <p/>
*
* @param clientId
* @param subscriptionName
* @param lastMessageId
* @param maxReturned
* @param listener
*
* @throws Exception
*/
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
MessageRecoveryListener listener) throws Exception;
/**
* For the new subscription find the last acknowledged message ID
* and then find any new messages since then and dispatch them
* to the subscription.
* <p/>
* e.g. if we dispatched some messages to a new durable topic subscriber, then went down before
* acknowledging any messages, we need to know the correct point from which to recover from.
* @param subscription
*
* @throws Exception
* Get the next un-acknowledged message to deliver to a subscriber
* @param clientId
* @param subscriptionName
* @return the next message or null
* @throws IOException
*/
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception;
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException;
/**
* Get the number of messages ready to deliver from the store to a durable subscriber
* @param clientId
* @param subscriberName
* @return the outstanding message count
* @throws IOException
*/
public int getMessageCount(String clientId,String subscriberName) throws IOException;
/**
* Finds the subscriber entry for the given consumer info
*
* @param clientId TODO
* @param subscriptionName TODO
* @return
* @param clientId
* @param subscriptionName
* @return the SubscriptionInfo
* @throws IOException
*/
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
/**
* Lists all the durable subscirptions for a given destination.
*
* @param clientId TODO
* @param subscriptionName TODO
* @return
* @return an array SubscriptionInfos
* @throws IOException
*/
public SubscriptionInfo[] getAllSubscriptions() throws IOException;
/**
* Inserts the subscriber info due to a subscription change
* <p/>
* If this is a new subscription and the retroactive is false, then the last
* message sent to the topic should be set as the last message acknowledged by they new
* subscription. Otherwise, if retroactive is true, then create the subscription without
* it having an acknowledged message so that on recovery, all message recorded for the
* topic get replayed.
* @param retroactive TODO
*
* Inserts the subscriber info due to a subscription change <p/> If this is a new subscription and the retroactive
* is false, then the last message sent to the topic should be set as the last message acknowledged by they new
* subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged
* message so that on recovery, all message recorded for the topic get replayed.
*
* @param clientId
* @param subscriptionName
* @param selector
* @param retroactive
* @throws IOException
*
*/
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException;
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException;
}

View File

@ -53,6 +53,9 @@ public interface JDBCAdapter {
public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception;
public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
public abstract void doSetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, String selector, boolean retroactive) throws SQLException, IOException;
@ -79,5 +82,8 @@ public interface JDBCAdapter {
public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException, IOException;
public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriptionName) throws SQLException, IOException;
}

View File

@ -89,6 +89,33 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
public void recoverNextMessages(final String clientId,final String subscriptionName, final MessageId lastMessageId,final int maxReturned,final MessageRecoveryListener listener) throws Exception{
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
long lastSequence = lastMessageId != null ? lastMessageId.getBrokerSequenceId() : -1;
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,lastSequence,maxReturned,
new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
}
public void recoverMessageReference(String reference) throws Exception {
listener.recoverMessageReference(reference);
}
public void finished(){
listener.finished();
}
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
}
/**
* @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
* boolean)
@ -148,4 +175,40 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
Message result = null;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
byte[] data = adapter.doGetNextDurableSubscriberMessageStatement(c, destination, clientId, subscriptionName);
result = (Message) wireFormat.unmarshal(new ByteSequence(data));
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
return result;
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
return result;
}
}

View File

@ -53,6 +53,7 @@ public class Statements {
private String updateLastAckOfDurableSubStatement;
private String deleteSubscriptionStatement;
private String findAllDurableSubMessagesStatement;
private String findDurableSubMessagesStatement;
private String findAllDestinationsStatement;
private String removeAllMessagesStatement;
private String removeAllSubscriptionsStatement;
@ -61,6 +62,8 @@ public class Statements {
private String[] dropSchemaStatements;
private String lockCreateStatement;
private String lockUpdateStatement;
private String nextDurableSubscriberMessageStatement;
private String durableSubscriberMessageCountStatement;
private boolean useLockCreateWhereClause;
public String[] getCreateSchemaStatements() {
@ -204,6 +207,47 @@ public class Statements {
}
return findAllDurableSubMessagesStatement;
}
public String getFindDurableSubMessagesStatement(){
if(findDurableSubMessagesStatement==null){
findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+getFullAckTableName()+" D "+" WHERE ? >= ( select count(*) from "
+getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > ?"+" ORDER BY M.ID)";
}
return findDurableSubMessagesStatement;
}
public String findAllDurableSubMessagesStatement() {
if (findAllDurableSubMessagesStatement == null) {
findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID";
}
return findAllDurableSubMessagesStatement;
}
public String getNextDurableSubscriberMessageStatement(){
if (nextDurableSubscriberMessageStatement == null){
nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+getFullAckTableName()+" D "+" WHERE 1 >= ( select count(*) from "
+getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+" ORDER BY M.ID)";
}
return nextDurableSubscriberMessageStatement;
}
/**
* @return the durableSubscriberMessageCountStatement
*/
public String getDurableSubscriberMessageCountStatement(){
if (durableSubscriberMessageCountStatement==null){
durableSubscriberMessageCountStatement = "select count(*) from "
+getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
}
return durableSubscriberMessageCountStatement;
}
public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) {
@ -499,4 +543,26 @@ public class Statements {
public void setLockUpdateStatement(String lockUpdateStatement) {
this.lockUpdateStatement = lockUpdateStatement;
}
/**
* @param findDurableSubMessagesStatement the findDurableSubMessagesStatement to set
*/
public void setFindDurableSubMessagesStatement(String findDurableSubMessagesStatement){
this.findDurableSubMessagesStatement=findDurableSubMessagesStatement;
}
/**
* @param nextDurableSubscriberMessageStatement the nextDurableSubscriberMessageStatement to set
*/
public void setNextDurableSubscriberMessageStatement(String nextDurableSubscriberMessageStatement){
this.nextDurableSubscriberMessageStatement=nextDurableSubscriberMessageStatement;
}
/**
* @param durableSubscriberMessageCountStatement the durableSubscriberMessageCountStatement to set
*/
public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){
this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement;
}
}

View File

@ -408,6 +408,58 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, int maxReturned,JDBCMessageRecoveryListener listener) throws Exception {
// dumpTables(c, destination.getQualifiedName(),clientId,subscriptionName);
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
s.setLong(4,seq);
s.setInt(5,maxReturned);
rs = s.executeQuery();
if( statements.isUseExternalMessageReferences() ) {
while (rs.next()) {
listener.recoverMessageReference(rs.getString(2));
}
} else {
while (rs.next()) {
listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2));
}
}
}
finally {
close(rs);
close(s);
listener.finished();
}
}
public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName) throws SQLException, IOException{
PreparedStatement s=null;
ResultSet rs=null;
int result = 0;
try{
s=c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement());
s.setString(1,destination.getQualifiedName());
s.setString(2,clientId);
s.setString(3,subscriptionName);
rs=s.executeQuery();
result = rs.getInt(1);
}finally{
close(rs);
close(s);
}
return result;
}
/**
* @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.apache.activemq.service.SubscriptionInfo)
@ -609,6 +661,29 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
this.statements = statements;
}
public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException,IOException{
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriberName);
rs = s.executeQuery();
if (!rs.next()) {
return null;
}
return getBinaryData(rs, 1);
}
finally {
close(rs);
close(s);
}
}
/*
* Useful for debugging.
public void dumpTables(Connection c, String destinationName, String clientId, String subscriptionName) throws SQLException {

View File

@ -604,11 +604,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) {
checkpoint(false, true);
newPercentUsage = ((newPercentUsage)/10)*10;
oldPercentUsage = ((oldPercentUsage)/10)*10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
boolean sync = newPercentUsage >= 90;
checkpoint(sync, true);
}
}
public JournalTransactionStore getTransactionStore() {
return transactionStore;
}

View File

@ -57,6 +57,12 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverSubscription(clientId, subscriptionName, listener);
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return longTermStore.lookupSubscription(clientId, subscriptionName);
@ -184,4 +190,16 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
return longTermStore.getAllSubscriptions();
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId,subscriberName);
}
}

View File

@ -71,6 +71,25 @@ public class QuickJournalTopicMessageStore extends QuickJournalMessageStore impl
});
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId, int maxReturned,final MessageRecoveryListener listener) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,new MessageRecoveryListener() {
public void recoverMessage(Message message) throws Exception {
throw new IOException("Should not get called.");
}
public void recoverMessageReference(String messageReference) throws Exception {
RecordLocation loc = toRecordLocation(messageReference);
Message message = (Message) peristenceAdapter.readCommand(loc);
listener.recoverMessage(message);
}
public void finished(){
listener.finished();
}
});
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return longTermStore.lookupSubscription(clientId, subscriptionName);
@ -197,5 +216,17 @@ public class QuickJournalTopicMessageStore extends QuickJournalMessageStore impl
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return longTermStore.getAllSubscriptions();
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId,subscriberName);
}
}

View File

@ -148,6 +148,42 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
listener.finished();
}
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
if(list!=null){
boolean startFound=false;
int count = 0;
for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
Object msg=messageContainer.get(i.next());
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
if (startFound || lastMessageId == null){
listener.recoverMessageReference(ref);
count++;
}
else if(startFound||ref.equals(lastMessageId.toString())){
startFound=true;
}
}else{
Message message=(Message) msg;
if(startFound||message.getMessageId().equals(lastMessageId)){
startFound=true;
}else{
listener.recoverMessage(message);
count++;
}
}
}
listener.finished();
}
}else{
listener.finished();
}
}
public void delete(){
super.delete();
@ -172,4 +208,20 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
Iterator iter = list.iterator();
return (Message) (iter.hasNext() ? iter.next() : null);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
return list.size();
}
}

View File

@ -111,6 +111,40 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
listener.finished();
}
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
boolean startFound=false;
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
int count = 0;
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() && count < maxReturned;){
Map.Entry entry=(Entry) iter.next();
Object msg=entry.getValue();
if(msg.getClass()==String.class){
String ref=msg.toString();
if(startFound||ref.equals(lastMessageId.toString())){
startFound=true;
}else if (startFound){
listener.recoverMessageReference(ref);
count++;
}
}else{
Message message=(Message) msg;
if(startFound||message.getMessageId().equals(lastMessageId)){
startFound=true;
}else if (startFound){
listener.recoverMessage(message);
count++;
}
}
}
listener.finished();
}
}
public void delete() {
super.delete();
@ -122,4 +156,34 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
if(entry.getKey().equals(lastAck)){
return (Message) entry.getValue();
}
}
}
return null;
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
int result = 0;
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
result = messageTable.size();
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
if(entry.getKey().equals(lastAck)){
break;
}
result--;
}
}
return result;
}
}

View File

@ -94,6 +94,42 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
}
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
if(list!=null){
boolean startFound=false;
int count = 0;
for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
Object msg=messageContainer.get(i.next());
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
if (startFound || lastMessageId == null){
listener.recoverMessageReference(ref);
count++;
}
else if(startFound||ref.equals(lastMessageId.toString())){
startFound=true;
}
}else{
Message message=(Message) msg;
if(startFound||message.getMessageId().equals(lastMessageId)){
startFound=true;
}else{
listener.recoverMessage(message);
count++;
}
}
}
listener.finished();
}
}else{
listener.finished();
}
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
@ -291,5 +327,19 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
Iterator iter = list.iterator();
return (Message) (iter.hasNext() ? iter.next() : null);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
return list.size();
}
}

View File

@ -1,26 +1,9 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used for eclipse testing, We want to see debug output on the console.
#
log4j.rootLogger=WARN, out
log4j.logger.org.apache.activemq=DEBUG
log4j.logger.org.apache.activemq=INFO
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender

View File

@ -28,6 +28,7 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.DefaultPersistenceAdapterFactory;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
/**
* @version $Revision$
@ -52,7 +53,14 @@ public class InactiveDurableTopicTest extends TestCase{
super.setUp();
broker=new BrokerService();
broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
//broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
/*
DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory();
factory.setDataDirectoryFile(broker.getDataDirectory());
factory.setTaskRunnerFactory(broker.getTaskRunnerFactory());
factory.setUseJournal(false);
broker.setPersistenceFactory(factory);
*/
broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
broker.start();
connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);