From b6ba20b965ec0fe97ead8f824a88d83d8e0d6400 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 9 Oct 2006 13:05:20 +0000 Subject: [PATCH] changes for https://issues.apache.org/activemq/browse/AMQ-845 - provide support for durable topic cursors git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@454368 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 3 +- .../region/DurableTopicSubscription.java | 6 +- .../broker/region/PrefetchSubscription.java | 8 +- .../cursors/AbstractPendingMessageCursor.java | 87 +- .../region/cursors/PendingMessageCursor.java | 21 +- .../cursors/StoreDurableSubscriberCursor.java | 119 ++- .../region/cursors/TopicStorePrefetch.java | 145 ++- .../apache/activemq/kaha/ListContainer.java | 140 ++- .../apache/activemq/kaha/impl/KahaStore.java | 2 +- .../CachedContainerListIterator.java | 4 - .../impl/container/ListContainerImpl.java | 169 ++-- .../store/ProxyTopicMessageStore.java | 12 +- .../activemq/store/TopicMessageStore.java | 28 +- .../activemq/store/jdbc/JDBCAdapter.java | 107 +- .../store/jdbc/JDBCTopicMessageStore.java | 77 +- .../activemq/store/jdbc/Statements.java | 84 +- .../jdbc/adapter/DefaultJDBCAdapter.java | 943 +++++++++--------- .../journal/JournalTopicMessageStore.java | 13 +- .../QuickJournalTopicMessageStore.java | 14 +- .../store/kahadaptor/ConsumerMessageRef.java | 58 ++ .../ConsumerMessageRefMarshaller.java | 69 ++ .../kahadaptor/KahaTopicMessageStore.java | 322 +++--- .../store/kahadaptor/TopicSubAck.java | 14 +- .../kahadaptor/TopicSubAckMarshaller.java | 4 +- .../store/kahadaptor/TopicSubContainer.java | 65 ++ .../store/memory/MemoryTopicMessageStore.java | 31 +- .../store/rapid/RapidTopicMessageStore.java | 267 ++--- .../org/apache/activemq/JMSConsumerTest.java | 2 +- .../region/cursors/CursorDurableTest.java | 215 ++++ .../region/cursors/KahaCursorDurableTest.java | 40 + 30 files changed, 1956 insertions(+), 1113 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index b56cb2f7ec..d35793b8ca 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -454,6 +454,7 @@ public class BrokerService implements Service, Serializable { if (broker != null) { stopper.stop(broker); } + tempDataStore.close(); if (isUseJmx()) { MBeanServer mbeanServer = getManagementContext().getMBeanServer(); @@ -957,7 +958,7 @@ public class BrokerService implements Service, Serializable { /** * @return the tempDataStore */ - public Store getTempDataStore() { + public synchronized Store getTempDataStore() { if (tempDataStore == null){ String name = getTmpDataDirectory().getPath(); try { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index efd0814f37..a5d905a914 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -41,9 +41,9 @@ public class DurableTopicSubscription extends PrefetchSubscription { private boolean active=false; public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException { - //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName())); - // super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore())); - super(broker,context,info); + //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),info.getPrefetchSize())); + //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore())); + super(broker,context,info); this.keepDurableSubsActive = keepDurableSubsActive; subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 881486109e..1039e9eee5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -124,7 +124,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ synchronized public void add(MessageReference node) throws Exception{ enqueueCounter++; - if(!isFull()){ + //if(!isFull()){ + if(!isFull() && pending.isEmpty() && canDispatch(node)){ dispatch(node); }else{ optimizePrefetch(); @@ -196,8 +197,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } dispatchMatched(); return; - }else{ - // System.out.println("no match: "+ack.getLastMessageId()+","+messageId); } } } @@ -435,8 +434,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ /** * @param node * @param message - * TODO - * @return + * @return MessageDispatch */ protected MessageDispatch createMessageDispatch(MessageReference node,Message message){ if( node == QueueMessageReference.NULL_MESSAGE ) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index a23d7af341..b13d7f19e0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -1,43 +1,96 @@ /** * - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ + package org.apache.activemq.broker.region.cursors; +import java.io.IOException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; - +import org.apache.activemq.broker.region.MessageReference; /** - * Default method holder for pending message (messages awaiting disptach to a consumer) cursor + * Abstract method holder for pending message (messages awaiting disptach to a + * consumer) cursor * * @version $Revision$ */ -public abstract class AbstractPendingMessageCursor implements PendingMessageCursor{ - +public class AbstractPendingMessageCursor implements PendingMessageCursor { + protected int maxBatchSize = 100; + public void start() throws Exception{ } - + public void stop() throws Exception{ } - - public void add(ConnectionContext context, Destination destination) throws Exception{ + + public void add(ConnectionContext context,Destination destination) + throws Exception{ } - public void remove(ConnectionContext context, Destination destination) throws Exception{ + public void remove(ConnectionContext context,Destination destination) + throws Exception{ } - - + public boolean isRecoveryRequired(){ return true; } + + public void addMessageFirst(MessageReference node) throws Exception{ + } + + public void addMessageLast(MessageReference node) throws Exception{ + } + + public void clear(){ + } + + public boolean hasNext(){ + return false; + } + + public boolean isEmpty(){ + return false; + } + + public MessageReference next(){ + return null; + } + + public void remove(){ + } + + public void reset(){ + } + + public int size(){ + return 0; + } + + public int getMaxBatchSize(){ + return maxBatchSize; + } + + public void setMaxBatchSize(int maxBatchSize){ + this.maxBatchSize=maxBatchSize; + } + + protected void fillBatch() throws Exception{ + } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index ff03858fc8..44f66d2873 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -13,6 +13,8 @@ */ package org.apache.activemq.broker.region.cursors; +import java.io.IOException; + import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; @@ -55,14 +57,17 @@ public interface PendingMessageCursor extends Service{ /** * add message to await dispatch * @param node + * @throws IOException + * @throws Exception */ - public void addMessageLast(MessageReference node); + public void addMessageLast(MessageReference node) throws Exception; /** * add message to await dispatch * @param node + * @throws Exception */ - public void addMessageFirst(MessageReference node); + public void addMessageFirst(MessageReference node) throws Exception; /** * @return true if there pending messages to dispatch @@ -94,8 +99,18 @@ public interface PendingMessageCursor extends Service{ /** * Informs the Broker if the subscription needs to intervention to recover it's state * e.g. DurableTopicSubscriber may do - * @see org.apache.activemq.region.cursors.PendingMessageCursor * @return true if recovery required */ public boolean isRecoveryRequired(); + + /** + * @return the maximum batch size + */ + public int getMaxBatchSize(); + + /** + * Set the max batch size + * @param maxBatchSize + */ + public void setMaxBatchSize(int maxBatchSize); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index b725d8c63a..92aaf26223 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -11,6 +11,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.broker.region.cursors; import java.io.IOException; @@ -22,24 +23,28 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.Message; +import org.apache.activemq.kaha.Store; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + /** * perist pending messages pending message (messages awaiting disptach to a consumer) cursor * * @version $Revision$ */ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ + static private final Log log=LogFactory.getLog(StoreDurableSubscriberCursor.class); private int pendingCount=0; private String clientId; private String subscriberName; - private int maxBatchSize=10; - private LinkedList batchList=new LinkedList(); private Map topics=new HashMap(); private LinkedList storePrefetches=new LinkedList(); - private AtomicBoolean started=new AtomicBoolean(); + private boolean started; + private PendingMessageCursor nonPersistent; + private PendingMessageCursor currentCursor; /** * @param topic @@ -47,24 +52,26 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ * @param subscriberName * @throws IOException */ - public StoreDurableSubscriberCursor(String clientId,String subscriberName){ + public StoreDurableSubscriberCursor(String clientId,String subscriberName,Store store,int maxBatchSize){ this.clientId=clientId; this.subscriberName=subscriberName; + this.nonPersistent=new FilePendingMessageCursor(clientId+subscriberName,store); + storePrefetches.add(nonPersistent); } public synchronized void start() throws Exception{ - started.set(true); + started=true; for(Iterator i=storePrefetches.iterator();i.hasNext();){ - TopicStorePrefetch tsp=(TopicStorePrefetch) i.next(); + PendingMessageCursor tsp=(PendingMessageCursor)i.next(); tsp.start(); pendingCount+=tsp.size(); } } public synchronized void stop() throws Exception{ - started.set(false); + started=false; for(Iterator i=storePrefetches.iterator();i.hasNext();){ - TopicStorePrefetch tsp=(TopicStorePrefetch) i.next(); + PendingMessageCursor tsp=(PendingMessageCursor)i.next(); tsp.stop(); } pendingCount=0; @@ -78,10 +85,11 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ * @throws Exception */ public synchronized void add(ConnectionContext context,Destination destination) throws Exception{ - TopicStorePrefetch tsp=new TopicStorePrefetch((Topic) destination,batchList,clientId,subscriberName); + TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName); + tsp.setMaxBatchSize(getMaxBatchSize()); topics.put(destination,tsp); storePrefetches.add(tsp); - if(started.get()){ + if(started){ tsp.start(); pendingCount+=tsp.size(); } @@ -95,7 +103,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ * @throws Exception */ public synchronized void remove(ConnectionContext context,Destination destination) throws Exception{ - TopicStorePrefetch tsp=(TopicStorePrefetch) topics.remove(destination); + Object tsp=topics.remove(destination); if(tsp!=null){ storePrefetches.remove(tsp); } @@ -119,12 +127,32 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ return false; } - public synchronized void addMessageFirst(MessageReference node){ - pendingCount++; + public synchronized void addMessageFirst(MessageReference node) throws IOException{ + if(started){ + throw new RuntimeException("This shouldn't be called!"); + } } - public synchronized void addMessageLast(MessageReference node){ - pendingCount++; + public synchronized void addMessageLast(MessageReference node) throws Exception{ + if(started){ + if(node!=null){ + Message msg=node.getMessage(); + if(!msg.isPersistent()){ + nonPersistent.addMessageLast(node); + }else{ + Destination dest=msg.getRegionDestination(); + TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest); + if(tsp!=null){ + tsp.addMessageLast(node); + // if the store has been empty - then this message is next to dispatch + if((pendingCount-nonPersistent.size())<=0){ + tsp.nextToDispatch(node.getMessageId()); + } + } + } + pendingCount++; + } + } } public void clear(){ @@ -132,49 +160,56 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ } public synchronized boolean hasNext(){ - return !isEmpty(); - } - - public synchronized MessageReference next(){ - MessageReference result=null; - if(!isEmpty()){ - if(batchList.isEmpty()){ - try{ - fillBatch(); - }catch(Exception e){ - log.error("Couldn't fill batch from store ",e); - throw new RuntimeException(e); - } - } - if(!batchList.isEmpty()){ - result=(MessageReference) batchList.removeFirst(); + boolean result=pendingCount>0; + if(result){ + try{ + currentCursor=getNextCursor(); + }catch(Exception e){ + log.error("Failed to get current cursor ",e); + throw new RuntimeException(e); } + result=currentCursor!=null?currentCursor.hasNext():false; } return result; } + public synchronized MessageReference next(){ + return currentCursor!=null?currentCursor.next():null; + } + public synchronized void remove(){ + if(currentCursor!=null){ + currentCursor.remove(); + } pendingCount--; } - public void reset(){ - batchList.clear(); + public synchronized void reset(){ + for(Iterator i=storePrefetches.iterator();i.hasNext();){ + AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); + tsp.reset(); + } } public int size(){ return pendingCount; } - private synchronized void fillBatch() throws Exception{ - for(Iterator i=storePrefetches.iterator();i.hasNext();){ - TopicStorePrefetch tsp=(TopicStorePrefetch) i.next(); - tsp.fillBatch(); - if(batchList.size()>=maxBatchSize){ - break; + protected synchronized PendingMessageCursor getNextCursor() throws Exception{ + if(currentCursor==null||currentCursor.isEmpty()){ + currentCursor=null; + for(Iterator i=storePrefetches.iterator();i.hasNext();){ + AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); + tsp.setMaxBatchSize(getMaxBatchSize()); + if(tsp.hasNext()){ + currentCursor=tsp; + break; + } } + // round-robin + Object obj=storePrefetches.removeFirst(); + storePrefetches.addLast(obj); } - // round-robin - Object obj=storePrefetches.removeFirst(); - storePrefetches.addLast(obj); + return currentCursor; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 2e44315776..8925391e46 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -1,20 +1,27 @@ /** * - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ + package org.apache.activemq.broker.region.cursors; import java.io.IOException; import java.util.LinkedList; +import javax.jms.JMSException; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; @@ -23,134 +30,114 @@ import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + /** - * perist pending messages pending message (messages awaiting disptach to a consumer) cursor + * perist pending messages pending message (messages awaiting disptach to a + * consumer) cursor * * @version $Revision$ */ -class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{ +class TopicStorePrefetch extends AbstractPendingMessageCursor implements + MessageRecoveryListener { + static private final Log log=LogFactory.getLog(TopicStorePrefetch.class); - private Topic topic; + private TopicMessageStore store; - private LinkedList batchList; + private final LinkedList batchList=new LinkedList(); private String clientId; private String subscriberName; - private int pendingCount=0; private MessageId lastMessageId; - private int maxBatchSize=10; + private Destination regionDestination; /** * @param topic - * @param batchList * @param clientId * @param subscriberName * @throws IOException */ - public TopicStorePrefetch(Topic topic,LinkedList batchList,String clientId,String subscriberName){ - this.topic=topic; - this.store=(TopicMessageStore) topic.getMessageStore(); - this.batchList=batchList; + public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){ + this.regionDestination = topic; + this.store=(TopicMessageStore)topic.getMessageStore(); this.clientId=clientId; this.subscriberName=subscriberName; } public void start() throws Exception{ - pendingCount=store.getMessageCount(clientId,subscriberName); - System.err.println("Pending count = "+pendingCount); } public void stop() throws Exception{ - pendingCount=0; - lastMessageId=null; + store.resetBatching(clientId,clientId,null); } /** * @return true if there are no pending messages */ public boolean isEmpty(){ - return pendingCount==0; + return batchList.isEmpty(); } - - /** - * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber - * may do - * - * @see org.apache.activemq.region.cursors.PendingMessageCursor - * @return true if recovery required - */ - public boolean isRecoveryRequired(){ - return false; - } - - public synchronized void addMessageFirst(MessageReference node){ - pendingCount++; - } - - public synchronized void addMessageLast(MessageReference node){ - pendingCount++; - } - - public void clear(){ - pendingCount=0; - lastMessageId=null; + + public synchronized int size(){ + try{ + return store.getMessageCount(clientId,subscriberName); + }catch(IOException e){ + log.error(this + " Failed to get the outstanding message count from the store",e); + throw new RuntimeException(e); + } } public synchronized boolean hasNext(){ + if(isEmpty()){ + try{ + fillBatch(); + }catch(Exception e){ + log.error("Failed to fill batch",e); + throw new RuntimeException(e); + } + } return !isEmpty(); } public synchronized MessageReference next(){ - MessageReference result=null; - if(!isEmpty()){ - if(batchList.isEmpty()){ - try{ - fillBatch(); - }catch(Exception e){ - log.error(topic.getDestination()+" Couldn't fill batch from store ",e); - throw new RuntimeException(e); - } - } - result=(MessageReference) batchList.removeFirst(); - } + Message result = (Message)batchList.removeFirst(); + result.setRegionDestination(regionDestination); return result; } - public synchronized void remove(){ - pendingCount--; - } - public void reset(){ - batchList.clear(); - } - - public int size(){ - return pendingCount; } // MessageRecoveryListener implementation - public void finished(){} + public void finished(){ + } public void recoverMessage(Message message) throws Exception{ + message.setRegionDestination(regionDestination); batchList.addLast(message); } - public void recoverMessageReference(String messageReference) throws Exception{ + public void recoverMessageReference(String messageReference) + throws Exception{ // shouldn't get called throw new RuntimeException("Not supported"); } // implementation protected void fillBatch() throws Exception{ - if(pendingCount<=0){ - pendingCount=store.getMessageCount(clientId,subscriberName); - } - if(pendingCount>0){ - store.recoverNextMessages(clientId,subscriberName,lastMessageId,maxBatchSize,this); - // this will add more messages to the batch list - if(!batchList.isEmpty()){ - Message message=(Message) batchList.getLast(); - lastMessageId=message.getMessageId(); - } + store.recoverNextMessages(clientId,subscriberName,lastMessageId, + maxBatchSize,this); + // this will add more messages to the batch list + if(!batchList.isEmpty()){ + Message message=(Message)batchList.getLast(); + lastMessageId=message.getMessageId(); } } + + public String toString() { + return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")"; + } + + synchronized void nextToDispatch(MessageId id) throws Exception { + lastMessageId = store.getPreviousMessageIdToDeliver(clientId,clientId,id); + store.resetBatching(clientId,clientId,id); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java b/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java index 02e1b3b02e..4099b65b1a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java @@ -1,58 +1,55 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.kaha; import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; + /** - *Represents a container of persistent objects in the store - *Acts as a map, but values can be retrieved in insertion order + * Represents a container of persistent objects in the store Acts as a map, but values can be retrieved in insertion + * order * * @version $Revision: 1.2 $ */ public interface ListContainer extends List{ + /** - * The container is created or retrieved in - * an unloaded state. - * load populates the container will all the indexes used etc - * and should be called before any operations on the container + * The container is created or retrieved in an unloaded state. load populates the container will all the indexes + * used etc and should be called before any operations on the container */ public void load(); - + /** * unload indexes from the container - * + * */ public void unload(); - + /** * @return true if the indexes are loaded */ public boolean isLoaded(); - - + /** - * For homogenous containers can set a custom marshaller for loading values - * The default uses Object serialization - * @param marshaller + * For homogenous containers can set a custom marshaller for loading values The default uses Object serialization + * + * @param marshaller */ public void setMarshaller(Marshaller marshaller); + /** * @return the id the MapContainer was create with */ @@ -62,46 +59,46 @@ public interface ListContainer extends List{ * @return the number of values in the container */ public int size(); - + /** * Inserts the given element at the beginning of this list. - * + * * @param o the element to be inserted at the beginning of this list. */ public void addFirst(Object o); /** - * Appends the given element to the end of this list. (Identical in - * function to the add method; included only for consistency.) - * + * Appends the given element to the end of this list. (Identical in function to the add method; included + * only for consistency.) + * * @param o the element to be inserted at the end of this list. */ public void addLast(Object o); - + /** * Removes and returns the first element from this list. - * + * * @return the first element from this list. - * @throws NoSuchElementException if this list is empty. + * @throws NoSuchElementException if this list is empty. */ public Object removeFirst(); /** * Removes and returns the last element from this list. - * + * * @return the last element from this list. - * @throws NoSuchElementException if this list is empty. + * @throws NoSuchElementException if this list is empty. */ public Object removeLast(); - - + /** * remove an objecr from the list without retrieving the old value from the store + * * @param position * @return true if successful */ public boolean doRemove(int position); - + /** * @return the maximumCacheSize */ @@ -111,46 +108,87 @@ public interface ListContainer extends List{ * @param maximumCacheSize the maximumCacheSize to set */ public void setMaximumCacheSize(int maximumCacheSize); - + /** * clear any cached values */ public void clearCache(); - + /** * add an Object to the list but get a StoreEntry of its position + * * @param object * @return the entry in the Store */ public StoreEntry placeLast(Object object); - + /** * insert an Object in first position int the list but get a StoreEntry of its position + * * @param object * @return the location in the Store */ public StoreEntry placeFirst(Object object); - + /** * Advanced feature = must ensure the object written doesn't overwrite other objects in the container - * @param entry - * @param object + * + * @param entry + * @param object */ - public void update(StoreEntry entry, Object object); - + public void update(StoreEntry entry,Object object); + /** * Retrieve an Object from the Store by its location + * * @param entry * @return the Object at that entry */ public Object get(StoreEntry entry); - + + /** + * Get the StoreEntry for the first item of the list + * + * @return the first StoreEntry or null if the list is empty + */ + public StoreEntry getFirst(); + + /** + * Get yjr StoreEntry for the last item of the list + * + * @return the last StoreEntry or null if the list is empty + */ + public StoreEntry getLast(); + + /** + * Get the next StoreEntry from the list + * + * @param entry + * @return the next StoreEntry or null + */ + public StoreEntry getNext(StoreEntry entry); + + /** + * Get the previous StoreEntry from the list + * + * @param entry + * @return the previous store entry or null + */ + public StoreEntry getPrevious(StoreEntry entry); + /** * remove the Object at the StoreEntry + * * @param entry * @return true if successful */ public boolean remove(StoreEntry entry); - + /** + * It's possible that a StoreEntry could be come stale + * this will return an upto date entry for the StoreEntry position + * @param entry old entry + * @return a refreshed StoreEntry + */ + public StoreEntry refresh(StoreEntry entry); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java index bc37be4c04..e689b9a088 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java @@ -381,7 +381,7 @@ public class KahaStore implements Store{ if(type==null||(!type.equals(IndexTypes.DISK_INDEX)&&!type.equals(IndexTypes.IN_MEMORY_INDEX))){ throw new RuntimeException("Unknown IndexType: "+type); } - this.indexType=indexType; + this.indexType=type; } public synchronized void initialize() throws IOException{ diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java index 0eb051cfe1..87084543d5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java @@ -18,9 +18,7 @@ package org.apache.activemq.kaha.impl.container; import java.util.ListIterator; - import org.apache.activemq.kaha.StoreEntry; -import org.apache.activemq.kaha.impl.index.IndexLinkedList; /** * @version $Revision$ @@ -28,14 +26,12 @@ import org.apache.activemq.kaha.impl.index.IndexLinkedList; public class CachedContainerListIterator implements ListIterator{ protected ListContainerImpl container; - protected IndexLinkedList list; protected int pos = 0; protected int nextPos =0; protected StoreEntry currentItem; protected CachedContainerListIterator(ListContainerImpl container,int start){ this.container=container; - this.list=list; this.pos=start; this.nextPos = this.pos; } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java index cf88348e99..6f736c1f49 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java @@ -1,19 +1,15 @@ /** * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.kaha.impl.container; @@ -52,8 +48,8 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine protected int maximumCacheSize=100; protected IndexItem lastCached; - public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType) - throws IOException{ + public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager, + String indexType) throws IOException{ super(id,root,indexManager,dataManager,indexType); } @@ -462,15 +458,15 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine indexList.add(index,item); itemAdded(item,index,element); } - - protected StoreEntry internalAddLast(Object o) { + + protected StoreEntry internalAddLast(Object o){ load(); IndexItem item=writeLast(o); indexList.addLast(item); itemAdded(item,indexList.size()-1,o); return item; } - + protected StoreEntry internalAddFirst(Object o){ load(); IndexItem item=writeFirst(o); @@ -486,8 +482,6 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine itemAdded(item,index,element); return item; } - - protected StoreEntry internalGet(int index){ load(); @@ -623,27 +617,29 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine } return result; } - + /** * add an Object to the list but get a StoreEntry of its position + * * @param object * @return the entry in the Store */ - public synchronized StoreEntry placeLast(Object object) { - StoreEntry item = internalAddLast(object); + public synchronized StoreEntry placeLast(Object object){ + StoreEntry item=internalAddLast(object); return item; } - + /** * insert an Object in first position int the list but get a StoreEntry of its position + * * @param object * @return the location in the Store */ - public synchronized StoreEntry placeFirst(Object object) { - StoreEntry item = internalAddFirst(object); + public synchronized StoreEntry placeFirst(Object object){ + StoreEntry item=internalAddFirst(object); return item; } - + /** * @param entry * @param object @@ -651,41 +647,90 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine */ public void update(StoreEntry entry,Object object){ try{ - dataManager.updateItem(entry.getValueDataItem(),marshaller, object); + dataManager.updateItem(entry.getValueDataItem(),marshaller,object); }catch(IOException e){ throw new RuntimeException(e); } - + } + + /** + * Retrieve an Object from the Store by its location + * + * @param entry + * @return the Object at that entry + */ + public synchronized Object get(StoreEntry entry){ + load(); + return getValue(entry); + } + + /** + * remove the Object at the StoreEntry + * + * @param entry + * @return true if successful + */ + public synchronized boolean remove(StoreEntry entry){ + IndexItem item=(IndexItem)entry; + load(); + boolean result=false; + if(item!=null){ + clearCache(); + remove(item); + result = true; + } + return result; + } + + /** + * Get the StoreEntry for the first item of the list + * + * @return the first StoreEntry or null if the list is empty + */ + public synchronized StoreEntry getFirst(){ + return indexList.getFirst(); + } + + /** + * Get yjr StoreEntry for the last item of the list + * + * @return the last StoreEntry or null if the list is empty + */ + public synchronized StoreEntry getLast(){ + return indexList.getLast(); + } + + /** + * Get the next StoreEntry from the list + * + * @param entry + * @return the next StoreEntry or null + */ + public synchronized StoreEntry getNext(StoreEntry entry){ + IndexItem item=(IndexItem)entry; + return indexList.getNextEntry(item); + } + + /** + * Get the previous StoreEntry from the list + * + * @param entry + * @return the previous store entry or null + */ + public synchronized StoreEntry getPrevious(StoreEntry entry){ + IndexItem item=(IndexItem)entry; + return indexList.getPrevEntry(item); } /** - * Retrieve an Object from the Store by its location - * @param entry - * @return the Object at that entry - */ - public synchronized Object get(StoreEntry entry) { - load(); - return getValue(entry); - } - - /** - * remove the Object at the StoreEntry - * @param entry - * @return true if successful - */ - public synchronized boolean remove(StoreEntry entry) { - IndexItem item = (IndexItem)entry; - load(); - boolean result = false; - if(item!=null){ - clearCache(); - IndexItem prev=indexList.getPrevEntry(item); - prev=prev!=null?prev:root; - IndexItem next=indexList.getNextEntry(item); - delete(item,prev,next); - } - return result; - } + * It's possible that a StoreEntry could be come stale + * this will return an upto date entry for the StoreEntry position + * @param entry old entry + * @return a refreshed StoreEntry + */ + public synchronized StoreEntry refresh(StoreEntry entry) { + return indexList.getEntry(entry); + } protected IndexItem writeLast(Object value){ IndexItem index=null; @@ -782,7 +827,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine if(item!=null){ try{ // ensure it's up to date - //item=indexList.getEntry(item); + // item=indexList.getEntry(item); StoreLocation data=item.getValueDataItem(); result=dataManager.readItem(marshaller,data); }catch(IOException e){ @@ -903,8 +948,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine } /** - * @param cacheList - * the cacheList to set + * @param cacheList the cacheList to set */ public synchronized void setCacheList(LinkedList cacheList){ this.cacheList=cacheList; @@ -918,8 +962,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine } /** - * @param lastCached - * the lastCached to set + * @param lastCached the lastCached to set */ public synchronized void setLastCached(IndexItem lastCached){ this.lastCached=lastCached; @@ -933,8 +976,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine } /** - * @param maximumCacheSize - * the maximumCacheSize to set + * @param maximumCacheSize the maximumCacheSize to set */ public synchronized void setMaximumCacheSize(int maximumCacheSize){ this.maximumCacheSize=maximumCacheSize; @@ -948,12 +990,9 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine } /** - * @param offset - * the offset to set + * @param offset the offset to set */ public synchronized void setOffset(int offset){ this.offset=offset; } - - } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index ee1d8f26a7..227db29dba 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -86,8 +86,16 @@ public class ProxyTopicMessageStore implements TopicMessageStore { delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener); } - public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{ - return delegate.getNextMessageToDeliver(clientId,subscriptionName); + public void resetBatching(String clientId,String subscriptionName,MessageId id) { + delegate.resetBatching(clientId,subscriptionName,id); + } + + public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{ + return delegate.getNextMessageIdToDeliver(clientId,subscriptionName,id); + } + + public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{ + return delegate.getPreviousMessageIdToDeliver(clientId,subscriptionName,id); } public ActiveMQDestination getDestination() { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java index 36474683e4..654c4156a1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java @@ -78,15 +78,37 @@ public interface TopicMessageStore extends MessageStore{ public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned, MessageRecoveryListener listener) throws Exception; + /** + * A hint to the Store to reset any batching state for a durable subsriber + * @param clientId + * @param subscriptionName + * @param nextToDispatch + * + */ + public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch); /** - * Get the next un-acknowledged message to deliver to a subscriber + * Get the next messageId to deliver to a subscriber after the MessageId provided * @param clientId * @param subscriptionName - * @return the next message or null + * @param id + * @return the next messageId or null * @throws IOException + * @throws Exception */ - public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException; + public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception; + + + /** + * Get the previous messageId to deliver to a subscriber before the MessageId provided + * @param clientId + * @param subscriptionName + * @param id + * @return the next messageId or null + * @throws IOException + * @throws Exception + */ + public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception; /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index a457a08ffc..08544aabcd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -1,26 +1,22 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.store.jdbc; import java.io.IOException; import java.sql.SQLException; import java.util.Set; - import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; @@ -28,62 +24,69 @@ import org.apache.activemq.command.SubscriptionInfo; /** * @version $Revision: 1.5 $ */ -public interface JDBCAdapter { - +public interface JDBCAdapter{ + public void setStatements(Statements statementProvider); - - public abstract void doCreateTables(TransactionContext c) throws SQLException, IOException; - public abstract void doDropTables(TransactionContext c) throws SQLException, IOException; + public abstract void doCreateTables(TransactionContext c) throws SQLException,IOException; - public abstract void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data, - long expiration) throws SQLException, IOException; - public abstract void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException; + public abstract void doDropTables(TransactionContext c) throws SQLException,IOException; - public abstract byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException; - public abstract String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException; + public abstract void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination, + byte[] data,long expiration) throws SQLException,IOException; - public abstract void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException; + public abstract void doAddMessageReference(TransactionContext c,MessageId messageId, + ActiveMQDestination destination,long expirationTime,String messageRef) throws SQLException,IOException; - public abstract void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) - throws Exception; + public abstract byte[] doGetMessage(TransactionContext c,long seq) throws SQLException,IOException; - public abstract void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq) throws SQLException, - IOException; + public abstract String doGetMessageReference(TransactionContext c,long id) throws SQLException,IOException; - public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, - String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception; - - public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, - String subscriptionName, long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception; + public abstract void doRemoveMessage(TransactionContext c,long seq) throws SQLException,IOException; - public abstract void doSetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, - String subscriptionName, String selector, boolean retroactive) throws SQLException, IOException; + public abstract void doRecover(TransactionContext c,ActiveMQDestination destination, + JDBCMessageRecoveryListener listener) throws Exception; - public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, - String clientId, String subscriptionName) - throws SQLException, IOException; + public abstract void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String clientId, + String subscriptionName,long seq) throws SQLException,IOException; - public abstract long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException; + public abstract void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String clientId, + String subscriptionName,JDBCMessageRecoveryListener listener) throws Exception; - public abstract void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException; + public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId, + String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception; - public abstract void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName, String clientId, String subscriptionName) - throws SQLException, IOException; + public abstract void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId, + String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException; - public abstract void doDeleteOldMessages(TransactionContext c) - throws SQLException, IOException; + public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination, + String clientId,String subscriptionName) throws SQLException,IOException; - public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException; + public abstract long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException,IOException; - public abstract Set doGetDestinations(TransactionContext c) throws SQLException, IOException; + public abstract void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName) + throws SQLException,IOException; + + public abstract void doDeleteSubscription(TransactionContext c,ActiveMQDestination destinationName,String clientId, + String subscriptionName) throws SQLException,IOException; + + public abstract void doDeleteOldMessages(TransactionContext c) throws SQLException,IOException; + + public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,IOException; + + public abstract Set doGetDestinations(TransactionContext c) throws SQLException,IOException; public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences); - public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException; + public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination) + throws SQLException,IOException; - public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException, IOException; - - public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriptionName) throws SQLException, IOException; + public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId, + String subscriptionName) throws SQLException,IOException; + public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination, + String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception; + + public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination, + String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception; } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index 157b801a7b..c6166acf91 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -30,6 +30,7 @@ import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.wireformat.WireFormat; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; /** * @version $Revision: 1.6 $ @@ -110,12 +111,16 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess }); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ",e); - throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e); + } finally { c.close(); } } + + public void resetBatching(String clientId,String subscriptionName,MessageId id) { + } + /** * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo, * boolean) @@ -175,21 +180,75 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess } } - public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{ - Message result = null; - + public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{ + + final MessageId result = new MessageId(); + final AtomicBoolean initalized = new AtomicBoolean(); TransactionContext c = persistenceAdapter.getTransactionContext(); try { - byte[] data = adapter.doGetNextDurableSubscriberMessageStatement(c, destination, clientId, subscriptionName); - result = (Message) wireFormat.unmarshal(new ByteSequence(data)); + long sequence = id != null ? id.getBrokerSequenceId() : -1; + adapter.doGetNextDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() { + public void recoverMessage(long sequenceId, byte[] data) throws Exception { + Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); + msg.getMessageId().setBrokerSequenceId(sequenceId); + result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId()); + initalized.set(true); + + } + public void recoverMessageReference(String reference) throws Exception { + result.setValue(reference); + initalized.set(true); + + } + + public void finished(){ + } + }); + } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ",e); - throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e); + throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e); } finally { c.close(); } - return result; + return initalized.get () ? result : null; + } + + public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{ + final MessageId result = new MessageId(); + final AtomicBoolean initalized = new AtomicBoolean(); + TransactionContext c = persistenceAdapter.getTransactionContext(); + try { + long sequence = id != null ? id.getBrokerSequenceId() : -1; + adapter.doGetPrevDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() { + public void recoverMessage(long sequenceId, byte[] data) throws Exception { + Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); + msg.getMessageId().setBrokerSequenceId(sequenceId); + result.setProducerId(msg.getMessageId().getProducerId()); + result.setProducerSequenceId(msg.getMessageId().getProducerSequenceId()); + result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId()); + initalized.set(true); + + } + public void recoverMessageReference(String reference) throws Exception { + result.setValue(reference); + initalized.set(true); + + } + + public void finished(){ + } + }); + + + } catch (SQLException e) { + JDBCPersistenceAdapter.log("JDBC Failure: ",e); + throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e); + } finally { + c.close(); + } + return initalized.get () ? result : null; } public int getMessageCount(String clientId,String subscriberName) throws IOException{ @@ -200,7 +259,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ",e); - throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e); + throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e); } finally { c.close(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index 77ca9afeac..37c65bd2c9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -64,6 +64,8 @@ public class Statements { private String lockUpdateStatement; private String nextDurableSubscriberMessageStatement; private String durableSubscriberMessageCountStatement; + private String nextDurableSubscriberMessageIdStatement; + private String prevDurableSubscriberMessageIdStatement; private boolean useLockCreateWhereClause; public String[] getCreateSchemaStatements() { @@ -210,10 +212,9 @@ public class Statements { public String getFindDurableSubMessagesStatement(){ if(findDurableSubMessagesStatement==null){ - findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, " - +getFullAckTableName()+" D "+" WHERE ?>= ( SELECT COUNT(*) FROM " - +getFullMessageTableName()+" M, " + getFullAckTableName() + " D WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - +" AND M.CONTAINER=D.CONTAINER AND M.ID > ?)"+" ORDER BY M.ID)"; + findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID"; } return findDurableSubMessagesStatement; } @@ -229,10 +230,9 @@ public class Statements { public String getNextDurableSubscriberMessageStatement(){ if (nextDurableSubscriberMessageStatement == null){ - nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, " - +getFullAckTableName()+" D "+" WHERE 1 >= ( SELECT COUNT(*) FROM " - +getFullMessageTableName()+" M, WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+") ORDER BY M.ID)"; + nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID "; } return nextDurableSubscriberMessageStatement; } @@ -240,14 +240,55 @@ public class Statements { /** * @return the durableSubscriberMessageCountStatement */ + + public String getDurableSubscriberMessageCountStatement(){ if (durableSubscriberMessageCountStatement==null){ - durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM " - +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"; + durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName() + " M, " + + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"; } return durableSubscriberMessageCountStatement; } + + /** + * @return the nextDurableSubscriberMessageIdStatement + */ + public String getNextDurableSubscriberMessageIdStatement(){ + if (nextDurableSubscriberMessageIdStatement==null) { + nextDurableSubscriberMessageIdStatement = + "SELECT M.ID FROM " + getFullMessageTableName() + " M, " + + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID "; + } + return nextDurableSubscriberMessageIdStatement; + } + + /** + * @return the prevDurableSubscriberMessageIdStatement + */ + /* + public String getPrevDurableSubscriberMessageIdStatement(){ + if(prevDurableSubscriberMessageIdStatement==null) { + prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + + " AND M.CONTAINER=D.CONTAINER AND M.ID < ?" + " ORDER BY M.ID "; + } + return prevDurableSubscriberMessageIdStatement; + } + */ + + + public String getPrevDurableSubscriberMessageIdStatement(){ + if(prevDurableSubscriberMessageIdStatement==null) { + prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M " + + " WHERE M.CONTAINER=? " + + " AND M.ID - * sub-classing is encouraged to override the default - * implementation of methods to account for differences - * in JDBC Driver implementations. - *

- * The JDBCAdapter inserts and extracts BLOB data using the - * getBytes()/setBytes() operations. - *

+ * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter.

sub-classing is + * encouraged to override the default implementation of methods to account for differences in JDBC Driver + * implementations.

The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations.

* The databases/JDBC drivers that use this adapter are: *

- * + * * @org.apache.xbean.XBean element="defaultJDBCAdapter" * * @version $Revision: 1.10 $ */ -public class DefaultJDBCAdapter implements JDBCAdapter { - - private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class); +public class DefaultJDBCAdapter implements JDBCAdapter{ + private static final Log log=LogFactory.getLog(DefaultJDBCAdapter.class); protected Statements statements; protected boolean batchStatments=true; - protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { - s.setBytes(index, data); + protected void setBinaryData(PreparedStatement s,int index,byte data[]) throws SQLException{ + s.setBytes(index,data); } - protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { + protected byte[] getBinaryData(ResultSet rs,int index) throws SQLException{ return rs.getBytes(index); } - public void doCreateTables(TransactionContext c) throws SQLException, IOException { - Statement s = null; - try { - - // Check to see if the table already exists. If it does, then don't log warnings during startup. - // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table - boolean alreadyExists = false; + public void doCreateTables(TransactionContext c) throws SQLException,IOException{ + Statement s=null; + try{ + // Check to see if the table already exists. If it does, then don't log warnings during startup. + // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version + // of the table + boolean alreadyExists=false; ResultSet rs=null; - try { - rs= c.getConnection().getMetaData().getTables(null,null, statements.getFullMessageTableName(), new String[] {"TABLE"}); - alreadyExists = rs.next(); - } catch (Throwable ignore) { - } finally { + try{ + rs=c.getConnection().getMetaData().getTables(null,null,statements.getFullMessageTableName(), + new String[] { "TABLE" }); + alreadyExists=rs.next(); + }catch(Throwable ignore){ + }finally{ close(rs); } - - s = c.getConnection().createStatement(); - String[] createStatments = statements.getCreateSchemaStatements(); - for (int i = 0; i < createStatments.length; i++) { + s=c.getConnection().createStatement(); + String[] createStatments=statements.getCreateSchemaStatements(); + for(int i=0;i D.LAST_ACKED_ID" - +" ORDER BY M.ID"); - s.setString(1,destinationName); - s.setString(2,clientId); - s.setString(3,subscriptionName); - printQuery(s,System.out); - } - - public void dumpTables(Connection c) throws SQLException { - printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); - printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); - } - - private void printQuery(Connection c, String query, PrintStream out) throws SQLException { - printQuery(c.prepareStatement(query), out); - } - - private void printQuery(PreparedStatement s, PrintStream out) throws SQLException { - - ResultSet set=null; - try { - set = s.executeQuery(); - ResultSetMetaData metaData = set.getMetaData(); - for( int i=1; i<= metaData.getColumnCount(); i++ ) { - if(i==1) - out.print("||"); - out.print(metaData.getColumnName(i)+"||"); + /** + * @param c + * @param destination + * @param clientId + * @param subscriberName + * @param id + * @return the previous Id + * @throws Exception + * @see org.apache.activemq.store.jdbc.JDBCAdapter#doGetPrevDurableSubscriberMessageStatement(org.apache.activemq.store.jdbc.TransactionContext, + * org.apache.activemq.command.ActiveMQDestination, java.lang.String, java.lang.String, java.lang.String) + */ + public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination, + String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception{ + PreparedStatement s=null; + ResultSet rs=null; + try{ + s=c.getConnection().prepareStatement(statements.getPrevDurableSubscriberMessageIdStatement()); + s.setString(1,destination.getQualifiedName()); + s.setLong(2,id); + rs=s.executeQuery(); + if (rs.next()) { + listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2)); } - out.println(); - while(set.next()) { - for( int i=1; i<= metaData.getColumnCount(); i++ ) { - if(i==1) - out.print("|"); - out.print(set.getString(i)+"|"); - } - out.println(); - } - } finally { - try { set.close(); } catch (Throwable ignore) {} - try { s.close(); } catch (Throwable ignore) {} + listener.finished(); + + }finally{ + close(rs); + close(s); } } - */ + + /** + * @param c + * @param destination + * @param clientId + * @param subscriberName + * @param id + * @return the next id + * @throws SQLException + * @throws IOException + * @see org.apache.activemq.store.jdbc.JDBCAdapter#doGetNextDurableSubscriberMessageIdStatement(org.apache.activemq.store.jdbc.TransactionContext, + * org.apache.activemq.command.ActiveMQDestination, java.lang.String, java.lang.String, java.lang.String) + */ + public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination, + String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception{ + PreparedStatement s=null; + ResultSet rs=null; + try{ + s=c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageIdStatement()); + s.setString(1,destination.getQualifiedName()); + s.setLong(2,id); + rs=s.executeQuery(); + if (rs.next()) { + listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2)); + } + listener.finished(); + + }finally{ + close(rs); + close(s); + } + } + /* + * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String + * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, + * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID, + * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND + * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID"); + * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); + * printQuery(s,System.out); } + * + * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", + * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); } + * + * private void printQuery(Connection c, String query, PrintStream out) throws SQLException { + * printQuery(c.prepareStatement(query), out); } + * + * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException { + * + * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<= + * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); } + * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|"); + * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {} + * try { s.close(); } catch (Throwable ignore) {} } } + */ } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java index 6a4efb00ee..ed32bda4df 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java @@ -190,15 +190,24 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top return longTermStore.getAllSubscriptions(); } - public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{ + public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{ this.peristenceAdapter.checkpoint(true, true); - return longTermStore.getNextMessageToDeliver(clientId,subscriptionName); + return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id); + } + + public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{ + this.peristenceAdapter.checkpoint(true, true); + return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id); } public int getMessageCount(String clientId,String subscriberName) throws IOException{ this.peristenceAdapter.checkpoint(true, true); return longTermStore.getMessageCount(clientId,subscriberName); } + + public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch) { + longTermStore.resetBatching(clientId,subscriptionName,nextToDispatch); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java index 5501d4bae6..e50fd634b5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java @@ -217,15 +217,25 @@ public class QuickJournalTopicMessageStore extends QuickJournalMessageStore impl return longTermStore.getAllSubscriptions(); } - public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{ + public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{ this.peristenceAdapter.checkpoint(true, true); - return longTermStore.getNextMessageToDeliver(clientId,subscriptionName); + return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id); } + public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{ + this.peristenceAdapter.checkpoint(true, true); + return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id); + } + + public int getMessageCount(String clientId,String subscriberName) throws IOException{ this.peristenceAdapter.checkpoint(true, true); return longTermStore.getMessageCount(clientId,subscriberName); } + + public void resetBatching(String clientId,String subscriptionName,MessageId nextId) { + longTermStore.resetBatching(clientId,subscriptionName,nextId); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java new file mode 100644 index 0000000000..b50ab0cdef --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java @@ -0,0 +1,58 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq.store.kahadaptor; + +import org.apache.activemq.kaha.StoreEntry; + +/** + * Holds information for location of message + * + * @version $Revision: 1.10 $ + */ +public class ConsumerMessageRef{ + + private StoreEntry messageEntry; + private StoreEntry ackEntry; + + /** + * @return the ackEntry + */ + public StoreEntry getAckEntry(){ + return this.ackEntry; + } + + /** + * @param ackEntry the ackEntry to set + */ + public void setAckEntry(StoreEntry ackEntry){ + this.ackEntry=ackEntry; + } + + /** + * @return the messageEntry + */ + public StoreEntry getMessageEntry(){ + return this.messageEntry; + } + + /** + * @param messageEntry the messageEntry to set + */ + public void setMessageEntry(StoreEntry messageEntry){ + this.messageEntry=messageEntry; + } + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java new file mode 100644 index 0000000000..f1c49781e9 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadaptor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.kaha.impl.index.IndexItem; + + +/** + * Marshall a TopicSubAck + * @version $Revision: 1.10 $ + */ +public class ConsumerMessageRefMarshaller implements Marshaller{ + + + /** + * @param object + * @param dataOut + * @throws IOException + * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput) + */ + public void writePayload(Object object,DataOutput dataOut) throws IOException{ + ConsumerMessageRef ref = (ConsumerMessageRef) object; + IndexItem item = (IndexItem)ref.getMessageEntry(); + dataOut.writeLong(item.getOffset()); + item.write(dataOut); + item = (IndexItem)ref.getAckEntry(); + dataOut.writeLong(item.getOffset()); + item.write(dataOut); + + } + + /** + * @param dataIn + * @return payload + * @throws IOException + * @see org.apache.activemq.kaha.Marshaller#readPayload(java.io.DataInput) + */ + public Object readPayload(DataInput dataIn) throws IOException{ + ConsumerMessageRef ref = new ConsumerMessageRef(); + IndexItem item = new IndexItem(); + item.setOffset(dataIn.readLong()); + item.read(dataIn); + ref.setMessageEntry(item); + item = new IndexItem(); + item.setOffset(dataIn.readLong()); + item.read(dataIn); + ref.setAckEntry(item); + return ref; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java index 9aa8270ff9..3cc3bb9b17 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java @@ -1,27 +1,23 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.store.kahadaptor; import java.io.IOException; import java.util.Iterator; import java.util.Map; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -32,68 +28,69 @@ import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreEntry; -import org.apache.activemq.kaha.StringMarshaller; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + /** * @version $Revision: 1.5 $ */ -public class KahaTopicMessageStore implements TopicMessageStore{ +public class KahaTopicMessageStore implements TopicMessageStore{ + private ActiveMQDestination destination; private ListContainer ackContainer; private ListContainer messageContainer; private Map subscriberContainer; private Store store; - private Map subscriberAcks=new ConcurrentHashMap(); + private Map subscriberMessages=new ConcurrentHashMap(); public KahaTopicMessageStore(Store store,ListContainer messageContainer,ListContainer ackContainer, - MapContainer subsContainer,ActiveMQDestination destination) throws IOException{ - this.messageContainer = messageContainer; - this.destination = destination; + MapContainer subsContainer,ActiveMQDestination destination) throws IOException{ + this.messageContainer=messageContainer; + this.destination=destination; this.store=store; this.ackContainer=ackContainer; subscriberContainer=subsContainer; // load all the Ack containers for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){ Object key=i.next(); - addSubscriberAckContainer(key); + addSubscriberMessageContainer(key); } } public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ - int subscriberCount=subscriberAcks.size(); + int subscriberCount=subscriberMessages.size(); if(subscriberCount>0){ - StoreEntry entry = messageContainer.placeLast(message); - TopicSubAck tsa = new TopicSubAck(); + StoreEntry messageEntry=messageContainer.placeLast(message); + TopicSubAck tsa=new TopicSubAck(); tsa.setCount(subscriberCount); - tsa.setStoreEntry(entry); - StoreEntry ackEntry = ackContainer.placeLast(tsa); - for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){ - Object key=i.next(); - ListContainer container=store.getListContainer(key,"durable-subs"); - container.add(ackEntry); + tsa.setMessageEntry(messageEntry); + StoreEntry ackEntry=ackContainer.placeLast(tsa); + for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){ + TopicSubContainer container=(TopicSubContainer)i.next(); + ConsumerMessageRef ref=new ConsumerMessageRef(); + ref.setAckEntry(ackEntry); + ref.setMessageEntry(messageEntry); + container.getListContainer().add(ref); } - } } public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName, MessageId messageId) throws IOException{ String subcriberId=getSubscriptionKey(clientId,subscriptionName); - ListContainer container=(ListContainer)subscriberAcks.get(subcriberId); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); if(container!=null){ - StoreEntry ackEntry=(StoreEntry)container.removeFirst(); - if(ackEntry!=null){ - TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry); + ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst(); + if(ref!=null){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); if(tsa!=null){ if(tsa.decrementCount()<=0){ - ackContainer.remove(ackEntry); - messageContainer.remove(tsa.getStoreEntry()); - }else { - ackContainer.update(ackEntry,tsa); + ackContainer.remove(ref.getAckEntry()); + messageContainer.remove(tsa.getMessageEntry()); + }else{ + ackContainer.update(ref.getAckEntry(),tsa); } } } @@ -101,11 +98,11 @@ public class KahaTopicMessageStore implements TopicMessageStore{ } public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{ - return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName)); + return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName)); } public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) - throws IOException{ + throws IOException{ SubscriptionInfo info=new SubscriptionInfo(); info.setDestination(destination); info.setClientId(clientId); @@ -117,23 +114,23 @@ public class KahaTopicMessageStore implements TopicMessageStore{ if(!subscriberContainer.containsKey(key)){ subscriberContainer.put(key,info); } - addSubscriberAckContainer(key); + addSubscriberMessageContainer(key); } public synchronized void deleteSubscription(String clientId,String subscriptionName){ String key=getSubscriptionKey(clientId,subscriptionName); subscriberContainer.remove(key); - ListContainer list=(ListContainer) subscriberAcks.get(key); - for(Iterator i=list.iterator();i.hasNext();){ - StoreEntry ackEntry=(StoreEntry)i.next(); - if(ackEntry!=null){ - TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); + for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); + if(ref!=null){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); if(tsa!=null){ if(tsa.decrementCount()<=0){ - ackContainer.remove(ackEntry); - messageContainer.remove(tsa.getStoreEntry()); - }else { - ackContainer.update(ackEntry,tsa); + ackContainer.remove(ref.getAckEntry()); + messageContainer.remove(tsa.getMessageEntry()); + }else{ + ackContainer.update(ref.getAckEntry(),tsa); } } } @@ -141,18 +138,18 @@ public class KahaTopicMessageStore implements TopicMessageStore{ } public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) - throws Exception{ + throws Exception{ String key=getSubscriptionKey(clientId,subscriptionName); - ListContainer list=(ListContainer) subscriberAcks.get(key); - if(list!=null){ - for(Iterator i=list.iterator();i.hasNext();){ - StoreEntry entry = (StoreEntry)i.next(); - Object msg=messageContainer.get(entry); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); + if(container!=null){ + for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); + Object msg=messageContainer.get(ref.getMessageEntry()); if(msg!=null){ if(msg.getClass()==String.class){ - listener.recoverMessageReference((String) msg); + listener.recoverMessageReference((String)msg); }else{ - listener.recoverMessage((Message) msg); + listener.recoverMessage((Message)msg); } } listener.finished(); @@ -161,42 +158,40 @@ public class KahaTopicMessageStore implements TopicMessageStore{ listener.finished(); } } - + public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned, - MessageRecoveryListener listener) throws Exception{ + MessageRecoveryListener listener) throws Exception{ String key=getSubscriptionKey(clientId,subscriptionName); - ListContainer list=(ListContainer) subscriberAcks.get(key); - if(list!=null){ - boolean startFound=false; - int count = 0; - for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){ - StoreEntry entry = (StoreEntry)i.next(); - Object msg=messageContainer.get(entry); - if(msg!=null){ - if(msg.getClass()==String.class){ - String ref=msg.toString(); - if (startFound || lastMessageId == null){ - listener.recoverMessageReference(ref); - count++; - } - else if(startFound||ref.equals(lastMessageId.toString())){ - startFound=true; - } - }else{ - Message message=(Message) msg; - if(startFound||message.getMessageId().equals(lastMessageId)){ - startFound=true; - }else{ - listener.recoverMessage(message); - count++; - } - } - } - listener.finished(); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); + if(container!=null){ + int count=0; + StoreEntry entry=container.getBatchEntry(); + if(entry==null){ + entry=container.getListContainer().getFirst(); + }else{ + entry=container.getListContainer().refresh(entry); + entry=container.getListContainer().getNext(entry); + } + if(entry!=null){ + do{ + ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry); + Object msg=messageContainer.get(consumerRef.getMessageEntry()); + if(msg!=null){ + if(msg.getClass()==String.class){ + String ref=msg.toString(); + listener.recoverMessageReference(ref); + }else{ + Message message=(Message)msg; + listener.recoverMessage(message); + } + count++; + } + container.setBatchEntry(entry); + entry=container.getListContainer().getNext(entry); + }while(entry!=null&&count0){ ackContainer.put(id,count); - } else { + }else{ // no more references to message messageContainer so remove it messageContainer.remove(messageId.toString()); } } } } - } - catch (Throwable e) { - log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e); + }catch(Throwable e){ + log.debug("Could not replay acknowledge for message '"+messageId + +"'. Message may have already been acknowledged. reason: "+e); } } - /** * @param messageId * @param location * @param key */ - private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { - synchronized(this) { - lastLocation = location; - ackedLastAckLocations.put(key, messageId); - + private void acknowledge(MessageId messageId,RecordLocation location,SubscriptionKey key){ + synchronized(this){ + lastLocation=location; + ackedLastAckLocations.put(key,messageId); String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName()); String id=messageId.toString(); - ListContainer container=(ListContainer) subscriberAcks.get(subcriberId); + ListContainer container=(ListContainer)subscriberAcks.get(subcriberId); if(container!=null){ - //container.remove(id); + // container.remove(id); container.removeFirst(); - AtomicInteger count=(AtomicInteger) ackContainer.remove(id); + AtomicInteger count=(AtomicInteger)ackContainer.remove(id); if(count!=null){ if(count.decrementAndGet()>0){ ackContainer.put(id,count); - } else { + }else{ // no more references to message messageContainer so remove it messageContainer.remove(messageId.toString()); } } } - } + } } - + protected String getSubscriptionKey(String clientId,String subscriberName){ String result=clientId+":"; result+=subscriberName!=null?subscriberName:"NOT_SET"; return result; } - - public RecordLocation checkpoint() throws IOException { - - ArrayList cpAckedLastAckLocations; - + public RecordLocation checkpoint() throws IOException{ + ArrayList cpAckedLastAckLocations; // swap out the hash maps.. - synchronized (this) { - cpAckedLastAckLocations = new ArrayList(this.ackedLastAckLocations.values()); - this.ackedLastAckLocations = new HashMap(); + synchronized(this){ + cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values()); + this.ackedLastAckLocations=new HashMap(); } - - RecordLocation rc = super.checkpoint(); - if(!cpAckedLastAckLocations.isEmpty()) { + RecordLocation rc=super.checkpoint(); + if(!cpAckedLastAckLocations.isEmpty()){ Collections.sort(cpAckedLastAckLocations); - RecordLocation t = (RecordLocation) cpAckedLastAckLocations.get(0); - if( rc == null || t.compareTo(rc)<0 ) { - rc = t; + RecordLocation t=(RecordLocation)cpAckedLastAckLocations.get(0); + if(rc==null||t.compareTo(rc)<0){ + rc=t; } } - return rc; } - - public void deleteSubscription(String clientId, String subscriptionName) throws IOException { + public void deleteSubscription(String clientId,String subscriptionName) throws IOException{ String key=getSubscriptionKey(clientId,subscriptionName); subscriberContainer.remove(key); - ListContainer list=(ListContainer) subscriberAcks.get(key); + ListContainer list=(ListContainer)subscriberAcks.get(key); for(Iterator i=list.iterator();i.hasNext();){ String id=i.next().toString(); - AtomicInteger count=(AtomicInteger) ackContainer.remove(id); + AtomicInteger count=(AtomicInteger)ackContainer.remove(id); if(count!=null){ if(count.decrementAndGet()>0){ ackContainer.put(id,count); @@ -316,30 +300,63 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe } } - public SubscriptionInfo[] getAllSubscriptions() throws IOException { - return (SubscriptionInfo[]) subscriberContainer.values().toArray( + public SubscriptionInfo[] getAllSubscriptions() throws IOException{ + return (SubscriptionInfo[])subscriberContainer.values().toArray( new SubscriptionInfo[subscriberContainer.size()]); } protected void addSubscriberAckContainer(Object key) throws IOException{ - ListContainer container=store.getListContainer(key,"topic-subs"); + ListContainer container=store.getListContainer(key,"durable-subs"); Marshaller marshaller=new StringMarshaller(); container.setMarshaller(marshaller); subscriberAcks.put(key,container); } - - public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{ + + public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId) + throws IOException{ + MessageId result=null; + boolean getNext=false; String key=getSubscriptionKey(clientId,subscriptionName); - ListContainer list=(ListContainer) subscriberAcks.get(key); - Iterator iter = list.iterator(); - return (Message) (iter.hasNext() ? iter.next() : null); - + ListContainer list=(ListContainer)subscriberAcks.get(key); + Iterator iter=list.iterator(); + for(Iterator i=list.iterator();i.hasNext();){ + String id=i.next().toString(); + if(id.equals(messageId.toString())){ + getNext=true; + }else if(getNext){ + result=new MessageId(id); + break; + } + } + return result; } - + + public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId) + throws IOException{ + MessageId result=null; + String previousId=null; + String key=getSubscriptionKey(clientId,subscriptionName); + ListContainer list=(ListContainer)subscriberAcks.get(key); + Iterator iter=list.iterator(); + for(Iterator i=list.iterator();i.hasNext();){ + String id=i.next().toString(); + if(id.equals(messageId.toString())){ + if(previousId!=null){ + result=new MessageId(previousId); + } + break; + } + previousId=id; + } + return result; + } + public int getMessageCount(String clientId,String subscriberName) throws IOException{ String key=getSubscriptionKey(clientId,subscriberName); - ListContainer list=(ListContainer) subscriberAcks.get(key); + ListContainer list=(ListContainer)subscriberAcks.get(key); return list.size(); } + public void resetBatching(String clientId,String subscriptionName,MessageId nextId){ + } } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index 485f5885ba..803eb128a9 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -507,7 +507,7 @@ public class JMSConsumerTest extends JmsTestSupport { session.commit(); // Only pick up the first message. - Message message1 = message1 = consumer.receive(1000); + Message message1 = consumer.receive(1000); assertNotNull(message1); // Don't acknowledge yet. This should keep our prefetch full. diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java new file mode 100644 index 0000000000..9d8025572a --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java @@ -0,0 +1,215 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; +/** + * @version $Revision: 1.3 $ + */ +public class CursorDurableTest extends TestCase{ + + protected static final Log log = LogFactory.getLog(CursorDurableTest.class); + + protected static final int MESSAGE_COUNT=50; + protected static final int PREFETCH_SIZE = 5; + protected BrokerService broker; + protected String bindAddress="tcp://localhost:60706"; + protected int topicCount=0; + + public void testSendFirstThenConsume() throws Exception{ + ConnectionFactory factory=createConnectionFactory(); + Connection consumerConnection= getConsumerConnection(factory); + //create durable subs + MessageConsumer consumer = getConsumer(consumerConnection); + consumerConnection.close(); + + Connection producerConnection = factory.createConnection(); + producerConnection.start(); + Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(getTopic(session)); + List senderList = new ArrayList(); + for (int i =0; i < MESSAGE_COUNT; i++) { + Message msg=session.createTextMessage("test"+i); + senderList.add(msg); + producer.send(msg); + } + producerConnection.close(); + + //now consume the messages + consumerConnection= getConsumerConnection(factory); + //create durable subs + consumer = getConsumer(consumerConnection); + List consumerList = new ArrayList(); + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message msg = consumer.receive(); + consumerList.add(msg); + } + assertEquals(senderList,consumerList); + consumerConnection.close(); + } + + public void testSendWhilstConsume() throws Exception{ + ConnectionFactory factory=createConnectionFactory(); + Connection consumerConnection= getConsumerConnection(factory); + //create durable subs + MessageConsumer consumer = getConsumer(consumerConnection); + consumerConnection.close(); + + Connection producerConnection = factory.createConnection(); + producerConnection.start(); + Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(getTopic(session)); + List senderList = new ArrayList(); + for (int i =0; i < MESSAGE_COUNT/10; i++) { + TextMessage msg=session.createTextMessage("test"+i); + senderList.add(msg); + producer.send(msg); + } + + + //now consume the messages + consumerConnection= getConsumerConnection(factory); + //create durable subs + consumer = getConsumer(consumerConnection); + final List consumerList = new ArrayList(); + + final CountDownLatch latch = new CountDownLatch(1); + consumer.setMessageListener(new MessageListener() { + + public void onMessage(Message msg){ + try{ + //sleep to act as a slow consumer + //which will force a mix of direct and polled dispatching + //using the cursor on the broker + Thread.sleep(50); + }catch(Exception e){ + // TODO Auto-generated catch block + e.printStackTrace(); + } + consumerList.add(msg); + if (consumerList.size()==MESSAGE_COUNT) { + latch.countDown(); + } + + } + + }); + for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) { + TextMessage msg=session.createTextMessage("test"+i); + senderList.add(msg); + producer.send(msg); + } + + + latch.await(300000,TimeUnit.MILLISECONDS); + assertEquals("Still dipatching - count down latch not sprung" , latch.getCount(),0); + assertEquals("cosumerList - expected: " + MESSAGE_COUNT + " but was: " + consumerList.size(),consumerList.size(),senderList.size()); + assertEquals(senderList,consumerList); + producerConnection.close(); + consumerConnection.close(); + } + + + + protected Topic getTopic(Session session) throws JMSException{ + String topicName=getClass().getName(); + return session.createTopic(topicName); + } + + protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{ + Connection connection=fac.createConnection(); + connection.setClientID("testConsumer"); + connection.start(); + return connection; + + } + + protected MessageConsumer getConsumer(Connection connection) throws Exception{ + Session consumerSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Topic topic = getTopic(consumerSession); + MessageConsumer consumer = consumerSession.createDurableSubscriber(topic,"testConsumer"); + return consumer; + } + + + + protected void setUp() throws Exception{ + if(broker==null){ + broker=createBroker(); + } + super.setUp(); + } + + protected void tearDown() throws Exception{ + super.tearDown(); + + if(broker!=null){ + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{ + ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress); + Properties props = new Properties(); + props.setProperty("prefetchPolicy.durableTopicPrefetch","" + PREFETCH_SIZE); + props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch","" + PREFETCH_SIZE); + cf.setProperties(props); + return cf; + } + + + + protected BrokerService createBroker() throws Exception{ + BrokerService answer=new BrokerService(); + configureBroker(answer); + answer.setDeleteAllMessagesOnStartup(true); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception{ + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java new file mode 100644 index 0000000000..745dd4eeda --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java @@ -0,0 +1,40 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.io.File; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +/** + * @version $Revision: 1.3 $ + */ +public class KahaCursorDurableTest extends CursorDurableTest{ + + protected static final Log log = LogFactory.getLog(KahaCursorDurableTest.class); + + + + protected void configureBroker(BrokerService answer) throws Exception{ + KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest")); + answer.setPersistenceAdapter(adaptor); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +}