provide support for durable topic cursors

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@454368 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-10-09 13:05:20 +00:00
parent 2a22129edd
commit b6ba20b965
30 changed files with 1956 additions and 1113 deletions

View File

@ -454,6 +454,7 @@ public class BrokerService implements Service, Serializable {
if (broker != null) {
stopper.stop(broker);
}
tempDataStore.close();
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
@ -957,7 +958,7 @@ public class BrokerService implements Service, Serializable {
/**
* @return the tempDataStore
*/
public Store getTempDataStore() {
public synchronized Store getTempDataStore() {
if (tempDataStore == null){
String name = getTmpDataDirectory().getPath();
try {

View File

@ -41,9 +41,9 @@ public class DurableTopicSubscription extends PrefetchSubscription {
private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
//super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName()));
// super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
super(broker,context,info);
//super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
//super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
super(broker,context,info);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
}

View File

@ -124,7 +124,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++;
if(!isFull()){
//if(!isFull()){
if(!isFull() && pending.isEmpty() && canDispatch(node)){
dispatch(node);
}else{
optimizePrefetch();
@ -196,8 +197,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}
dispatchMatched();
return;
}else{
// System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
}
}
}
@ -435,8 +434,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
/**
* @param node
* @param message
* TODO
* @return
* @return MessageDispatch
*/
protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
if( node == QueueMessageReference.NULL_MESSAGE ) {

View File

@ -1,43 +1,96 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
/**
* Default method holder for pending message (messages awaiting disptach to a consumer) cursor
* Abstract method holder for pending message (messages awaiting disptach to a
* consumer) cursor
*
* @version $Revision$
*/
public abstract class AbstractPendingMessageCursor implements PendingMessageCursor{
public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected int maxBatchSize = 100;
public void start() throws Exception{
}
public void stop() throws Exception{
}
public void add(ConnectionContext context, Destination destination) throws Exception{
public void add(ConnectionContext context,Destination destination)
throws Exception{
}
public void remove(ConnectionContext context, Destination destination) throws Exception{
public void remove(ConnectionContext context,Destination destination)
throws Exception{
}
public boolean isRecoveryRequired(){
return true;
}
public void addMessageFirst(MessageReference node) throws Exception{
}
public void addMessageLast(MessageReference node) throws Exception{
}
public void clear(){
}
public boolean hasNext(){
return false;
}
public boolean isEmpty(){
return false;
}
public MessageReference next(){
return null;
}
public void remove(){
}
public void reset(){
}
public int size(){
return 0;
}
public int getMaxBatchSize(){
return maxBatchSize;
}
public void setMaxBatchSize(int maxBatchSize){
this.maxBatchSize=maxBatchSize;
}
protected void fillBatch() throws Exception{
}
}

View File

@ -13,6 +13,8 @@
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
@ -55,14 +57,17 @@ public interface PendingMessageCursor extends Service{
/**
* add message to await dispatch
* @param node
* @throws IOException
* @throws Exception
*/
public void addMessageLast(MessageReference node);
public void addMessageLast(MessageReference node) throws Exception;
/**
* add message to await dispatch
* @param node
* @throws Exception
*/
public void addMessageFirst(MessageReference node);
public void addMessageFirst(MessageReference node) throws Exception;
/**
* @return true if there pending messages to dispatch
@ -94,8 +99,18 @@ public interface PendingMessageCursor extends Service{
/**
* Informs the Broker if the subscription needs to intervention to recover it's state
* e.g. DurableTopicSubscriber may do
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired();
/**
* @return the maximum batch size
*/
public int getMaxBatchSize();
/**
* Set the max batch size
* @param maxBatchSize
*/
public void setMaxBatchSize(int maxBatchSize);
}

View File

@ -11,6 +11,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
@ -22,24 +23,28 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
static private final Log log=LogFactory.getLog(StoreDurableSubscriberCursor.class);
private int pendingCount=0;
private String clientId;
private String subscriberName;
private int maxBatchSize=10;
private LinkedList batchList=new LinkedList();
private Map topics=new HashMap();
private LinkedList storePrefetches=new LinkedList();
private AtomicBoolean started=new AtomicBoolean();
private boolean started;
private PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
/**
* @param topic
@ -47,24 +52,26 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
* @param subscriberName
* @throws IOException
*/
public StoreDurableSubscriberCursor(String clientId,String subscriberName){
public StoreDurableSubscriberCursor(String clientId,String subscriberName,Store store,int maxBatchSize){
this.clientId=clientId;
this.subscriberName=subscriberName;
this.nonPersistent=new FilePendingMessageCursor(clientId+subscriberName,store);
storePrefetches.add(nonPersistent);
}
public synchronized void start() throws Exception{
started.set(true);
started=true;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
PendingMessageCursor tsp=(PendingMessageCursor)i.next();
tsp.start();
pendingCount+=tsp.size();
}
}
public synchronized void stop() throws Exception{
started.set(false);
started=false;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
PendingMessageCursor tsp=(PendingMessageCursor)i.next();
tsp.stop();
}
pendingCount=0;
@ -78,10 +85,11 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
* @throws Exception
*/
public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
TopicStorePrefetch tsp=new TopicStorePrefetch((Topic) destination,batchList,clientId,subscriberName);
TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
tsp.setMaxBatchSize(getMaxBatchSize());
topics.put(destination,tsp);
storePrefetches.add(tsp);
if(started.get()){
if(started){
tsp.start();
pendingCount+=tsp.size();
}
@ -95,7 +103,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
* @throws Exception
*/
public synchronized void remove(ConnectionContext context,Destination destination) throws Exception{
TopicStorePrefetch tsp=(TopicStorePrefetch) topics.remove(destination);
Object tsp=topics.remove(destination);
if(tsp!=null){
storePrefetches.remove(tsp);
}
@ -119,12 +127,32 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
return false;
}
public synchronized void addMessageFirst(MessageReference node){
pendingCount++;
public synchronized void addMessageFirst(MessageReference node) throws IOException{
if(started){
throw new RuntimeException("This shouldn't be called!");
}
}
public synchronized void addMessageLast(MessageReference node){
pendingCount++;
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(started){
if(node!=null){
Message msg=node.getMessage();
if(!msg.isPersistent()){
nonPersistent.addMessageLast(node);
}else{
Destination dest=msg.getRegionDestination();
TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
if(tsp!=null){
tsp.addMessageLast(node);
// if the store has been empty - then this message is next to dispatch
if((pendingCount-nonPersistent.size())<=0){
tsp.nextToDispatch(node.getMessageId());
}
}
}
pendingCount++;
}
}
}
public void clear(){
@ -132,49 +160,56 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
}
public synchronized boolean hasNext(){
return !isEmpty();
}
public synchronized MessageReference next(){
MessageReference result=null;
if(!isEmpty()){
if(batchList.isEmpty()){
try{
fillBatch();
}catch(Exception e){
log.error("Couldn't fill batch from store ",e);
throw new RuntimeException(e);
}
}
if(!batchList.isEmpty()){
result=(MessageReference) batchList.removeFirst();
boolean result=pendingCount>0;
if(result){
try{
currentCursor=getNextCursor();
}catch(Exception e){
log.error("Failed to get current cursor ",e);
throw new RuntimeException(e);
}
result=currentCursor!=null?currentCursor.hasNext():false;
}
return result;
}
public synchronized MessageReference next(){
return currentCursor!=null?currentCursor.next():null;
}
public synchronized void remove(){
if(currentCursor!=null){
currentCursor.remove();
}
pendingCount--;
}
public void reset(){
batchList.clear();
public synchronized void reset(){
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
tsp.reset();
}
}
public int size(){
return pendingCount;
}
private synchronized void fillBatch() throws Exception{
for(Iterator i=storePrefetches.iterator();i.hasNext();){
TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
tsp.fillBatch();
if(batchList.size()>=maxBatchSize){
break;
protected synchronized PendingMessageCursor getNextCursor() throws Exception{
if(currentCursor==null||currentCursor.isEmpty()){
currentCursor=null;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
tsp.setMaxBatchSize(getMaxBatchSize());
if(tsp.hasNext()){
currentCursor=tsp;
break;
}
}
// round-robin
Object obj=storePrefetches.removeFirst();
storePrefetches.addLast(obj);
}
// round-robin
Object obj=storePrefetches.removeFirst();
storePrefetches.addLast(obj);
return currentCursor;
}
}

View File

@ -1,20 +1,27 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.LinkedList;
import javax.jms.JMSException;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
@ -23,134 +30,114 @@ import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
* perist pending messages pending message (messages awaiting disptach to a
* consumer) cursor
*
* @version $Revision$
*/
class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{
class TopicStorePrefetch extends AbstractPendingMessageCursor implements
MessageRecoveryListener {
static private final Log log=LogFactory.getLog(TopicStorePrefetch.class);
private Topic topic;
private TopicMessageStore store;
private LinkedList batchList;
private final LinkedList batchList=new LinkedList();
private String clientId;
private String subscriberName;
private int pendingCount=0;
private MessageId lastMessageId;
private int maxBatchSize=10;
private Destination regionDestination;
/**
* @param topic
* @param batchList
* @param clientId
* @param subscriberName
* @throws IOException
*/
public TopicStorePrefetch(Topic topic,LinkedList batchList,String clientId,String subscriberName){
this.topic=topic;
this.store=(TopicMessageStore) topic.getMessageStore();
this.batchList=batchList;
public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){
this.regionDestination = topic;
this.store=(TopicMessageStore)topic.getMessageStore();
this.clientId=clientId;
this.subscriberName=subscriberName;
}
public void start() throws Exception{
pendingCount=store.getMessageCount(clientId,subscriberName);
System.err.println("Pending count = "+pendingCount);
}
public void stop() throws Exception{
pendingCount=0;
lastMessageId=null;
store.resetBatching(clientId,clientId,null);
}
/**
* @return true if there are no pending messages
*/
public boolean isEmpty(){
return pendingCount==0;
return batchList.isEmpty();
}
/**
* Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
* may do
*
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired(){
return false;
}
public synchronized void addMessageFirst(MessageReference node){
pendingCount++;
}
public synchronized void addMessageLast(MessageReference node){
pendingCount++;
}
public void clear(){
pendingCount=0;
lastMessageId=null;
public synchronized int size(){
try{
return store.getMessageCount(clientId,subscriberName);
}catch(IOException e){
log.error(this + " Failed to get the outstanding message count from the store",e);
throw new RuntimeException(e);
}
}
public synchronized boolean hasNext(){
if(isEmpty()){
try{
fillBatch();
}catch(Exception e){
log.error("Failed to fill batch",e);
throw new RuntimeException(e);
}
}
return !isEmpty();
}
public synchronized MessageReference next(){
MessageReference result=null;
if(!isEmpty()){
if(batchList.isEmpty()){
try{
fillBatch();
}catch(Exception e){
log.error(topic.getDestination()+" Couldn't fill batch from store ",e);
throw new RuntimeException(e);
}
}
result=(MessageReference) batchList.removeFirst();
}
Message result = (Message)batchList.removeFirst();
result.setRegionDestination(regionDestination);
return result;
}
public synchronized void remove(){
pendingCount--;
}
public void reset(){
batchList.clear();
}
public int size(){
return pendingCount;
}
// MessageRecoveryListener implementation
public void finished(){}
public void finished(){
}
public void recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
batchList.addLast(message);
}
public void recoverMessageReference(String messageReference) throws Exception{
public void recoverMessageReference(String messageReference)
throws Exception{
// shouldn't get called
throw new RuntimeException("Not supported");
}
// implementation
protected void fillBatch() throws Exception{
if(pendingCount<=0){
pendingCount=store.getMessageCount(clientId,subscriberName);
}
if(pendingCount>0){
store.recoverNextMessages(clientId,subscriberName,lastMessageId,maxBatchSize,this);
// this will add more messages to the batch list
if(!batchList.isEmpty()){
Message message=(Message) batchList.getLast();
lastMessageId=message.getMessageId();
}
store.recoverNextMessages(clientId,subscriberName,lastMessageId,
maxBatchSize,this);
// this will add more messages to the batch list
if(!batchList.isEmpty()){
Message message=(Message)batchList.getLast();
lastMessageId=message.getMessageId();
}
}
public String toString() {
return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")";
}
synchronized void nextToDispatch(MessageId id) throws Exception {
lastMessageId = store.getPreviousMessageIdToDeliver(clientId,clientId,id);
store.resetBatching(clientId,clientId,id);
}
}

View File

@ -1,58 +1,55 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
/**
*Represents a container of persistent objects in the store
*Acts as a map, but values can be retrieved in insertion order
* Represents a container of persistent objects in the store Acts as a map, but values can be retrieved in insertion
* order
*
* @version $Revision: 1.2 $
*/
public interface ListContainer extends List{
/**
* The container is created or retrieved in
* an unloaded state.
* load populates the container will all the indexes used etc
* and should be called before any operations on the container
* The container is created or retrieved in an unloaded state. load populates the container will all the indexes
* used etc and should be called before any operations on the container
*/
public void load();
/**
* unload indexes from the container
*
*
*/
public void unload();
/**
* @return true if the indexes are loaded
*/
public boolean isLoaded();
/**
* For homogenous containers can set a custom marshaller for loading values
* The default uses Object serialization
* @param marshaller
* For homogenous containers can set a custom marshaller for loading values The default uses Object serialization
*
* @param marshaller
*/
public void setMarshaller(Marshaller marshaller);
/**
* @return the id the MapContainer was create with
*/
@ -62,46 +59,46 @@ public interface ListContainer extends List{
* @return the number of values in the container
*/
public int size();
/**
* Inserts the given element at the beginning of this list.
*
*
* @param o the element to be inserted at the beginning of this list.
*/
public void addFirst(Object o);
/**
* Appends the given element to the end of this list. (Identical in
* function to the <tt>add</tt> method; included only for consistency.)
*
* Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
* only for consistency.)
*
* @param o the element to be inserted at the end of this list.
*/
public void addLast(Object o);
/**
* Removes and returns the first element from this list.
*
*
* @return the first element from this list.
* @throws NoSuchElementException if this list is empty.
* @throws NoSuchElementException if this list is empty.
*/
public Object removeFirst();
/**
* Removes and returns the last element from this list.
*
*
* @return the last element from this list.
* @throws NoSuchElementException if this list is empty.
* @throws NoSuchElementException if this list is empty.
*/
public Object removeLast();
/**
* remove an objecr from the list without retrieving the old value from the store
*
* @param position
* @return true if successful
*/
public boolean doRemove(int position);
/**
* @return the maximumCacheSize
*/
@ -111,46 +108,87 @@ public interface ListContainer extends List{
* @param maximumCacheSize the maximumCacheSize to set
*/
public void setMaximumCacheSize(int maximumCacheSize);
/**
* clear any cached values
*/
public void clearCache();
/**
* add an Object to the list but get a StoreEntry of its position
*
* @param object
* @return the entry in the Store
*/
public StoreEntry placeLast(Object object);
/**
* insert an Object in first position int the list but get a StoreEntry of its position
*
* @param object
* @return the location in the Store
*/
public StoreEntry placeFirst(Object object);
/**
* Advanced feature = must ensure the object written doesn't overwrite other objects in the container
* @param entry
* @param object
*
* @param entry
* @param object
*/
public void update(StoreEntry entry, Object object);
public void update(StoreEntry entry,Object object);
/**
* Retrieve an Object from the Store by its location
*
* @param entry
* @return the Object at that entry
*/
public Object get(StoreEntry entry);
/**
* Get the StoreEntry for the first item of the list
*
* @return the first StoreEntry or null if the list is empty
*/
public StoreEntry getFirst();
/**
* Get yjr StoreEntry for the last item of the list
*
* @return the last StoreEntry or null if the list is empty
*/
public StoreEntry getLast();
/**
* Get the next StoreEntry from the list
*
* @param entry
* @return the next StoreEntry or null
*/
public StoreEntry getNext(StoreEntry entry);
/**
* Get the previous StoreEntry from the list
*
* @param entry
* @return the previous store entry or null
*/
public StoreEntry getPrevious(StoreEntry entry);
/**
* remove the Object at the StoreEntry
*
* @param entry
* @return true if successful
*/
public boolean remove(StoreEntry entry);
/**
* It's possible that a StoreEntry could be come stale
* this will return an upto date entry for the StoreEntry position
* @param entry old entry
* @return a refreshed StoreEntry
*/
public StoreEntry refresh(StoreEntry entry);
}

View File

@ -381,7 +381,7 @@ public class KahaStore implements Store{
if(type==null||(!type.equals(IndexTypes.DISK_INDEX)&&!type.equals(IndexTypes.IN_MEMORY_INDEX))){
throw new RuntimeException("Unknown IndexType: "+type);
}
this.indexType=indexType;
this.indexType=type;
}
public synchronized void initialize() throws IOException{

View File

@ -18,9 +18,7 @@
package org.apache.activemq.kaha.impl.container;
import java.util.ListIterator;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.index.IndexLinkedList;
/**
* @version $Revision$
@ -28,14 +26,12 @@ import org.apache.activemq.kaha.impl.index.IndexLinkedList;
public class CachedContainerListIterator implements ListIterator{
protected ListContainerImpl container;
protected IndexLinkedList list;
protected int pos = 0;
protected int nextPos =0;
protected StoreEntry currentItem;
protected CachedContainerListIterator(ListContainerImpl container,int start){
this.container=container;
this.list=list;
this.pos=start;
this.nextPos = this.pos;
}

View File

@ -1,19 +1,15 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl.container;
@ -52,8 +48,8 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
protected int maximumCacheSize=100;
protected IndexItem lastCached;
public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType)
throws IOException{
public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
String indexType) throws IOException{
super(id,root,indexManager,dataManager,indexType);
}
@ -462,15 +458,15 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
indexList.add(index,item);
itemAdded(item,index,element);
}
protected StoreEntry internalAddLast(Object o) {
protected StoreEntry internalAddLast(Object o){
load();
IndexItem item=writeLast(o);
indexList.addLast(item);
itemAdded(item,indexList.size()-1,o);
return item;
}
protected StoreEntry internalAddFirst(Object o){
load();
IndexItem item=writeFirst(o);
@ -486,8 +482,6 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
itemAdded(item,index,element);
return item;
}
protected StoreEntry internalGet(int index){
load();
@ -623,27 +617,29 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
return result;
}
/**
* add an Object to the list but get a StoreEntry of its position
*
* @param object
* @return the entry in the Store
*/
public synchronized StoreEntry placeLast(Object object) {
StoreEntry item = internalAddLast(object);
public synchronized StoreEntry placeLast(Object object){
StoreEntry item=internalAddLast(object);
return item;
}
/**
* insert an Object in first position int the list but get a StoreEntry of its position
*
* @param object
* @return the location in the Store
*/
public synchronized StoreEntry placeFirst(Object object) {
StoreEntry item = internalAddFirst(object);
public synchronized StoreEntry placeFirst(Object object){
StoreEntry item=internalAddFirst(object);
return item;
}
/**
* @param entry
* @param object
@ -651,41 +647,90 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*/
public void update(StoreEntry entry,Object object){
try{
dataManager.updateItem(entry.getValueDataItem(),marshaller, object);
dataManager.updateItem(entry.getValueDataItem(),marshaller,object);
}catch(IOException e){
throw new RuntimeException(e);
}
}
/**
* Retrieve an Object from the Store by its location
*
* @param entry
* @return the Object at that entry
*/
public synchronized Object get(StoreEntry entry){
load();
return getValue(entry);
}
/**
* remove the Object at the StoreEntry
*
* @param entry
* @return true if successful
*/
public synchronized boolean remove(StoreEntry entry){
IndexItem item=(IndexItem)entry;
load();
boolean result=false;
if(item!=null){
clearCache();
remove(item);
result = true;
}
return result;
}
/**
* Get the StoreEntry for the first item of the list
*
* @return the first StoreEntry or null if the list is empty
*/
public synchronized StoreEntry getFirst(){
return indexList.getFirst();
}
/**
* Get yjr StoreEntry for the last item of the list
*
* @return the last StoreEntry or null if the list is empty
*/
public synchronized StoreEntry getLast(){
return indexList.getLast();
}
/**
* Get the next StoreEntry from the list
*
* @param entry
* @return the next StoreEntry or null
*/
public synchronized StoreEntry getNext(StoreEntry entry){
IndexItem item=(IndexItem)entry;
return indexList.getNextEntry(item);
}
/**
* Get the previous StoreEntry from the list
*
* @param entry
* @return the previous store entry or null
*/
public synchronized StoreEntry getPrevious(StoreEntry entry){
IndexItem item=(IndexItem)entry;
return indexList.getPrevEntry(item);
}
/**
* Retrieve an Object from the Store by its location
* @param entry
* @return the Object at that entry
*/
public synchronized Object get(StoreEntry entry) {
load();
return getValue(entry);
}
/**
* remove the Object at the StoreEntry
* @param entry
* @return true if successful
*/
public synchronized boolean remove(StoreEntry entry) {
IndexItem item = (IndexItem)entry;
load();
boolean result = false;
if(item!=null){
clearCache();
IndexItem prev=indexList.getPrevEntry(item);
prev=prev!=null?prev:root;
IndexItem next=indexList.getNextEntry(item);
delete(item,prev,next);
}
return result;
}
* It's possible that a StoreEntry could be come stale
* this will return an upto date entry for the StoreEntry position
* @param entry old entry
* @return a refreshed StoreEntry
*/
public synchronized StoreEntry refresh(StoreEntry entry) {
return indexList.getEntry(entry);
}
protected IndexItem writeLast(Object value){
IndexItem index=null;
@ -782,7 +827,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
if(item!=null){
try{
// ensure it's up to date
//item=indexList.getEntry(item);
// item=indexList.getEntry(item);
StoreLocation data=item.getValueDataItem();
result=dataManager.readItem(marshaller,data);
}catch(IOException e){
@ -903,8 +948,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
/**
* @param cacheList
* the cacheList to set
* @param cacheList the cacheList to set
*/
public synchronized void setCacheList(LinkedList cacheList){
this.cacheList=cacheList;
@ -918,8 +962,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
/**
* @param lastCached
* the lastCached to set
* @param lastCached the lastCached to set
*/
public synchronized void setLastCached(IndexItem lastCached){
this.lastCached=lastCached;
@ -933,8 +976,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
/**
* @param maximumCacheSize
* the maximumCacheSize to set
* @param maximumCacheSize the maximumCacheSize to set
*/
public synchronized void setMaximumCacheSize(int maximumCacheSize){
this.maximumCacheSize=maximumCacheSize;
@ -948,12 +990,9 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
/**
* @param offset
* the offset to set
* @param offset the offset to set
*/
public synchronized void setOffset(int offset){
this.offset=offset;
}
}

View File

@ -86,8 +86,16 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
return delegate.getNextMessageToDeliver(clientId,subscriptionName);
public void resetBatching(String clientId,String subscriptionName,MessageId id) {
delegate.resetBatching(clientId,subscriptionName,id);
}
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
return delegate.getNextMessageIdToDeliver(clientId,subscriptionName,id);
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
return delegate.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
}
public ActiveMQDestination getDestination() {

View File

@ -78,15 +78,37 @@ public interface TopicMessageStore extends MessageStore{
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
MessageRecoveryListener listener) throws Exception;
/**
* A hint to the Store to reset any batching state for a durable subsriber
* @param clientId
* @param subscriptionName
* @param nextToDispatch
*
*/
public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch);
/**
* Get the next un-acknowledged message to deliver to a subscriber
* Get the next messageId to deliver to a subscriber after the MessageId provided
* @param clientId
* @param subscriptionName
* @return the next message or null
* @param id
* @return the next messageId or null
* @throws IOException
* @throws Exception
*/
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException;
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
/**
* Get the previous messageId to deliver to a subscriber before the MessageId provided
* @param clientId
* @param subscriptionName
* @param id
* @return the next messageId or null
* @throws IOException
* @throws Exception
*/
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
/**

View File

@ -1,26 +1,22 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
@ -28,62 +24,69 @@ import org.apache.activemq.command.SubscriptionInfo;
/**
* @version $Revision: 1.5 $
*/
public interface JDBCAdapter {
public interface JDBCAdapter{
public void setStatements(Statements statementProvider);
public abstract void doCreateTables(TransactionContext c) throws SQLException, IOException;
public abstract void doDropTables(TransactionContext c) throws SQLException, IOException;
public abstract void doCreateTables(TransactionContext c) throws SQLException,IOException;
public abstract void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration) throws SQLException, IOException;
public abstract void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
public abstract void doDropTables(TransactionContext c) throws SQLException,IOException;
public abstract byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
public abstract String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
public abstract void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination,
byte[] data,long expiration) throws SQLException,IOException;
public abstract void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
public abstract void doAddMessageReference(TransactionContext c,MessageId messageId,
ActiveMQDestination destination,long expirationTime,String messageRef) throws SQLException,IOException;
public abstract void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
throws Exception;
public abstract byte[] doGetMessage(TransactionContext c,long seq) throws SQLException,IOException;
public abstract void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq) throws SQLException,
IOException;
public abstract String doGetMessageReference(TransactionContext c,long id) throws SQLException,IOException;
public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception;
public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
public abstract void doRemoveMessage(TransactionContext c,long seq) throws SQLException,IOException;
public abstract void doSetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, String selector, boolean retroactive) throws SQLException, IOException;
public abstract void doRecover(TransactionContext c,ActiveMQDestination destination,
JDBCMessageRecoveryListener listener) throws Exception;
public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriptionName)
throws SQLException, IOException;
public abstract void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName,long seq) throws SQLException,IOException;
public abstract long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
public abstract void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName,JDBCMessageRecoveryListener listener) throws Exception;
public abstract void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
public abstract void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName, String clientId, String subscriptionName)
throws SQLException, IOException;
public abstract void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException;
public abstract void doDeleteOldMessages(TransactionContext c)
throws SQLException, IOException;
public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,
String clientId,String subscriptionName) throws SQLException,IOException;
public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException;
public abstract long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException,IOException;
public abstract Set doGetDestinations(TransactionContext c) throws SQLException, IOException;
public abstract void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName)
throws SQLException,IOException;
public abstract void doDeleteSubscription(TransactionContext c,ActiveMQDestination destinationName,String clientId,
String subscriptionName) throws SQLException,IOException;
public abstract void doDeleteOldMessages(TransactionContext c) throws SQLException,IOException;
public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,IOException;
public abstract Set doGetDestinations(TransactionContext c) throws SQLException,IOException;
public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences);
public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination)
throws SQLException,IOException;
public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException, IOException;
public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriptionName) throws SQLException, IOException;
public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName) throws SQLException,IOException;
public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* @version $Revision: 1.6 $
@ -110,12 +111,16 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
}
public void resetBatching(String clientId,String subscriptionName,MessageId id) {
}
/**
* @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
* boolean)
@ -175,21 +180,75 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
Message result = null;
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
final MessageId result = new MessageId();
final AtomicBoolean initalized = new AtomicBoolean();
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
byte[] data = adapter.doGetNextDurableSubscriberMessageStatement(c, destination, clientId, subscriptionName);
result = (Message) wireFormat.unmarshal(new ByteSequence(data));
long sequence = id != null ? id.getBrokerSequenceId() : -1;
adapter.doGetNextDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
initalized.set(true);
}
public void recoverMessageReference(String reference) throws Exception {
result.setValue(reference);
initalized.set(true);
}
public void finished(){
}
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
return result;
return initalized.get () ? result : null;
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
final MessageId result = new MessageId();
final AtomicBoolean initalized = new AtomicBoolean();
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
long sequence = id != null ? id.getBrokerSequenceId() : -1;
adapter.doGetPrevDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
result.setProducerId(msg.getMessageId().getProducerId());
result.setProducerSequenceId(msg.getMessageId().getProducerSequenceId());
result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
initalized.set(true);
}
public void recoverMessageReference(String reference) throws Exception {
result.setValue(reference);
initalized.set(true);
}
public void finished(){
}
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
return initalized.get () ? result : null;
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
@ -200,7 +259,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}

View File

@ -64,6 +64,8 @@ public class Statements {
private String lockUpdateStatement;
private String nextDurableSubscriberMessageStatement;
private String durableSubscriberMessageCountStatement;
private String nextDurableSubscriberMessageIdStatement;
private String prevDurableSubscriberMessageIdStatement;
private boolean useLockCreateWhereClause;
public String[] getCreateSchemaStatements() {
@ -210,10 +212,9 @@ public class Statements {
public String getFindDurableSubMessagesStatement(){
if(findDurableSubMessagesStatement==null){
findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+getFullAckTableName()+" D "+" WHERE ?>= ( SELECT COUNT(*) FROM "
+getFullMessageTableName()+" M, " + getFullAckTableName() + " D WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > ?)"+" ORDER BY M.ID)";
findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID";
}
return findDurableSubMessagesStatement;
}
@ -229,10 +230,9 @@ public class Statements {
public String getNextDurableSubscriberMessageStatement(){
if (nextDurableSubscriberMessageStatement == null){
nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+getFullAckTableName()+" D "+" WHERE 1 >= ( SELECT COUNT(*) FROM "
+getFullMessageTableName()+" M, WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+") ORDER BY M.ID)";
nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID ";
}
return nextDurableSubscriberMessageStatement;
}
@ -240,14 +240,55 @@ public class Statements {
/**
* @return the durableSubscriberMessageCountStatement
*/
public String getDurableSubscriberMessageCountStatement(){
if (durableSubscriberMessageCountStatement==null){
durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM "
+getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
}
return durableSubscriberMessageCountStatement;
}
/**
* @return the nextDurableSubscriberMessageIdStatement
*/
public String getNextDurableSubscriberMessageIdStatement(){
if (nextDurableSubscriberMessageIdStatement==null) {
nextDurableSubscriberMessageIdStatement =
"SELECT M.ID FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID ";
}
return nextDurableSubscriberMessageIdStatement;
}
/**
* @return the prevDurableSubscriberMessageIdStatement
*/
/*
public String getPrevDurableSubscriberMessageIdStatement(){
if(prevDurableSubscriberMessageIdStatement==null) {
prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID < ?" + " ORDER BY M.ID ";
}
return prevDurableSubscriberMessageIdStatement;
}
*/
public String getPrevDurableSubscriberMessageIdStatement(){
if(prevDurableSubscriberMessageIdStatement==null) {
prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M "
+ " WHERE M.CONTAINER=? "
+ " AND M.ID <?" + " ORDER BY M.ID DESC ";
}
return prevDurableSubscriberMessageIdStatement;
}
public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) {
@ -565,4 +606,25 @@ public class Statements {
public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){
this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement;
}
/**
* @param nextDurableSubscriberMessageIdStatement the nextDurableSubscriberMessageIdStatement to set
*/
public void setNextDurableSubscriberMessageIdStatement(String nextDurableSubscriberMessageIdStatement){
this.nextDurableSubscriberMessageIdStatement=nextDurableSubscriberMessageIdStatement;
}
/**
* @param prevDurableSubscriberMessageIdStatement the prevDurableSubscriberMessageIdStatement to set
*/
public void setPrevDurableSubscriberMessageIdStatement(String prevDurableSubscriberMessageIdStatement){
this.prevDurableSubscriberMessageIdStatement=prevDurableSubscriberMessageIdStatement;
}
}

View File

@ -190,15 +190,24 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
return longTermStore.getAllSubscriptions();
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId,subscriberName);
}
public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch) {
longTermStore.resetBatching(clientId,subscriptionName,nextToDispatch);
}

View File

@ -217,15 +217,25 @@ public class QuickJournalTopicMessageStore extends QuickJournalMessageStore impl
return longTermStore.getAllSubscriptions();
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId,subscriberName);
}
public void resetBatching(String clientId,String subscriptionName,MessageId nextId) {
longTermStore.resetBatching(clientId,subscriptionName,nextId);
}

View File

@ -0,0 +1,58 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import org.apache.activemq.kaha.StoreEntry;
/**
* Holds information for location of message
*
* @version $Revision: 1.10 $
*/
public class ConsumerMessageRef{
private StoreEntry messageEntry;
private StoreEntry ackEntry;
/**
* @return the ackEntry
*/
public StoreEntry getAckEntry(){
return this.ackEntry;
}
/**
* @param ackEntry the ackEntry to set
*/
public void setAckEntry(StoreEntry ackEntry){
this.ackEntry=ackEntry;
}
/**
* @return the messageEntry
*/
public StoreEntry getMessageEntry(){
return this.messageEntry;
}
/**
* @param messageEntry the messageEntry to set
*/
public void setMessageEntry(StoreEntry messageEntry){
this.messageEntry=messageEntry;
}
}

View File

@ -0,0 +1,69 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.impl.index.IndexItem;
/**
* Marshall a TopicSubAck
* @version $Revision: 1.10 $
*/
public class ConsumerMessageRefMarshaller implements Marshaller{
/**
* @param object
* @param dataOut
* @throws IOException
* @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput)
*/
public void writePayload(Object object,DataOutput dataOut) throws IOException{
ConsumerMessageRef ref = (ConsumerMessageRef) object;
IndexItem item = (IndexItem)ref.getMessageEntry();
dataOut.writeLong(item.getOffset());
item.write(dataOut);
item = (IndexItem)ref.getAckEntry();
dataOut.writeLong(item.getOffset());
item.write(dataOut);
}
/**
* @param dataIn
* @return payload
* @throws IOException
* @see org.apache.activemq.kaha.Marshaller#readPayload(java.io.DataInput)
*/
public Object readPayload(DataInput dataIn) throws IOException{
ConsumerMessageRef ref = new ConsumerMessageRef();
IndexItem item = new IndexItem();
item.setOffset(dataIn.readLong());
item.read(dataIn);
ref.setMessageEntry(item);
item = new IndexItem();
item.setOffset(dataIn.readLong());
item.read(dataIn);
ref.setAckEntry(item);
return ref;
}
}

View File

@ -1,27 +1,23 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -32,68 +28,69 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* @version $Revision: 1.5 $
*/
public class KahaTopicMessageStore implements TopicMessageStore{
public class KahaTopicMessageStore implements TopicMessageStore{
private ActiveMQDestination destination;
private ListContainer ackContainer;
private ListContainer messageContainer;
private Map subscriberContainer;
private Store store;
private Map subscriberAcks=new ConcurrentHashMap();
private Map subscriberMessages=new ConcurrentHashMap();
public KahaTopicMessageStore(Store store,ListContainer messageContainer,ListContainer ackContainer,
MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
this.messageContainer = messageContainer;
this.destination = destination;
MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
this.messageContainer=messageContainer;
this.destination=destination;
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
// load all the Ack containers
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
Object key=i.next();
addSubscriberAckContainer(key);
addSubscriberMessageContainer(key);
}
}
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
int subscriberCount=subscriberAcks.size();
int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){
StoreEntry entry = messageContainer.placeLast(message);
TopicSubAck tsa = new TopicSubAck();
StoreEntry messageEntry=messageContainer.placeLast(message);
TopicSubAck tsa=new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setStoreEntry(entry);
StoreEntry ackEntry = ackContainer.placeLast(tsa);
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
Object key=i.next();
ListContainer container=store.getListContainer(key,"durable-subs");
container.add(ackEntry);
tsa.setMessageEntry(messageEntry);
StoreEntry ackEntry=ackContainer.placeLast(tsa);
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(ackEntry);
ref.setMessageEntry(messageEntry);
container.getListContainer().add(ref);
}
}
}
public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageId messageId) throws IOException{
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
StoreEntry ackEntry=(StoreEntry)container.removeFirst();
if(ackEntry!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry);
ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ackEntry);
messageContainer.remove(tsa.getStoreEntry());
}else {
ackContainer.update(ackEntry,tsa);
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
@ -101,11 +98,11 @@ public class KahaTopicMessageStore implements TopicMessageStore{
}
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
@ -117,23 +114,23 @@ public class KahaTopicMessageStore implements TopicMessageStore{
if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info);
}
addSubscriberAckContainer(key);
addSubscriberMessageContainer(key);
}
public synchronized void deleteSubscription(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName);
subscriberContainer.remove(key);
ListContainer list=(ListContainer) subscriberAcks.get(key);
for(Iterator i=list.iterator();i.hasNext();){
StoreEntry ackEntry=(StoreEntry)i.next();
if(ackEntry!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ackEntry);
messageContainer.remove(tsa.getStoreEntry());
}else {
ackContainer.update(ackEntry,tsa);
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
@ -141,18 +138,18 @@ public class KahaTopicMessageStore implements TopicMessageStore{
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
if(list!=null){
for(Iterator i=list.iterator();i.hasNext();){
StoreEntry entry = (StoreEntry)i.next();
Object msg=messageContainer.get(entry);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
Object msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message) msg);
listener.recoverMessage((Message)msg);
}
}
listener.finished();
@ -161,42 +158,40 @@ public class KahaTopicMessageStore implements TopicMessageStore{
listener.finished();
}
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
MessageRecoveryListener listener) throws Exception{
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
if(list!=null){
boolean startFound=false;
int count = 0;
for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
StoreEntry entry = (StoreEntry)i.next();
Object msg=messageContainer.get(entry);
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
if (startFound || lastMessageId == null){
listener.recoverMessageReference(ref);
count++;
}
else if(startFound||ref.equals(lastMessageId.toString())){
startFound=true;
}
}else{
Message message=(Message) msg;
if(startFound||message.getMessageId().equals(lastMessageId)){
startFound=true;
}else{
listener.recoverMessage(message);
count++;
}
}
}
listener.finished();
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
int count=0;
StoreEntry entry=container.getBatchEntry();
if(entry==null){
entry=container.getListContainer().getFirst();
}else{
entry=container.getListContainer().refresh(entry);
entry=container.getListContainer().getNext(entry);
}
if(entry!=null){
do{
ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
Object msg=messageContainer.get(consumerRef.getMessageEntry());
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
listener.recoverMessageReference(ref);
}else{
Message message=(Message)msg;
listener.recoverMessage(message);
}
count++;
}
container.setBatchEntry(entry);
entry=container.getListContainer().getNext(entry);
}while(entry!=null&&count<maxReturned);
}
}else{
listener.finished();
}
listener.finished();
}
public void delete(){
@ -206,8 +201,8 @@ public class KahaTopicMessageStore implements TopicMessageStore{
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[]) subscriberContainer.values().toArray(
new SubscriptionInfo[subscriberContainer.size()]);
return (SubscriptionInfo[])subscriberContainer.values().toArray(
new SubscriptionInfo[subscriberContainer.size()]);
}
protected String getSubscriptionKey(String clientId,String subscriberName){
@ -216,25 +211,18 @@ public class KahaTopicMessageStore implements TopicMessageStore{
return result;
}
protected void addSubscriberAckContainer(Object key) throws IOException{
protected void addSubscriberMessageContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key,"topic-subs");
Marshaller marshaller=new StoreEntryMarshaller();
Marshaller marshaller=new ConsumerMessageRefMarshaller();
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
StoreEntry entry = (StoreEntry)list.get(0);
Message msg=(Message)messageContainer.get(entry);
return msg;
TopicSubContainer tsc=new TopicSubContainer(container);
subscriberMessages.put(key,tsc);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
return list.size();
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
return container.getListContainer().size();
}
/**
@ -243,11 +231,12 @@ public class KahaTopicMessageStore implements TopicMessageStore{
* @param expirationTime
* @param messageRef
* @throws IOException
* @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.MessageId, long, java.lang.String)
* @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext,
* org.apache.activemq.command.MessageId, long, java.lang.String)
*/
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) throws IOException{
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException{
messageContainer.add(messageRef);
}
/**
@ -255,7 +244,7 @@ public class KahaTopicMessageStore implements TopicMessageStore{
* @see org.apache.activemq.store.MessageStore#getDestination()
*/
public ActiveMQDestination getDestination(){
return destination;
return destination;
}
/**
@ -265,11 +254,11 @@ public class KahaTopicMessageStore implements TopicMessageStore{
* @see org.apache.activemq.store.MessageStore#getMessage(org.apache.activemq.command.MessageId)
*/
public Message getMessage(MessageId identity) throws IOException{
Message result = null;
for (Iterator i = messageContainer.iterator(); i.hasNext();){
Message msg = (Message)i.next();
if (msg.getMessageId().equals(identity)) {
result = msg;
Message result=null;
for(Iterator i=messageContainer.iterator();i.hasNext();){
Message msg=(Message)i.next();
if(msg.getMessageId().equals(identity)){
result=msg;
break;
}
}
@ -294,13 +283,12 @@ public class KahaTopicMessageStore implements TopicMessageStore{
for(Iterator iter=messageContainer.iterator();iter.hasNext();){
Object msg=iter.next();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message) msg);
listener.recoverMessage((Message)msg);
}
}
listener.finished();
}
/**
@ -308,26 +296,30 @@ public class KahaTopicMessageStore implements TopicMessageStore{
* @throws IOException
* @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
*/
public void removeAllMessages(ConnectionContext context) throws IOException{
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
ackContainer.clear();
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
container.getListContainer().clear();
}
}
/**
* @param context
* @param ack
* @throws IOException
* @see org.apache.activemq.store.MessageStore#removeMessage(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.MessageAck)
* @see org.apache.activemq.store.MessageStore#removeMessage(org.apache.activemq.broker.ConnectionContext,
* org.apache.activemq.command.MessageAck)
*/
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
for (Iterator i = messageContainer.iterator(); i.hasNext();){
Message msg = (Message)i.next();
if (msg.getMessageId().equals(ack.getLastMessageId())) {
i.remove();
for(Iterator i=messageContainer.iterator();i.hasNext();){
Message msg=(Message)i.next();
if(msg.getMessageId().equals(ack.getLastMessageId())){
i.remove();
break;
}
}
}
/**
@ -336,7 +328,6 @@ public class KahaTopicMessageStore implements TopicMessageStore{
*/
public void setUsageManager(UsageManager usageManager){
// TODO Auto-generated method stub
}
/**
@ -345,7 +336,6 @@ public class KahaTopicMessageStore implements TopicMessageStore{
*/
public void start() throws Exception{
// TODO Auto-generated method stub
}
/**
@ -354,8 +344,76 @@ public class KahaTopicMessageStore implements TopicMessageStore{
*/
public void stop() throws Exception{
// TODO Auto-generated method stub
}
/**
* @param clientId
* @param subscriptionName
* @see org.apache.activemq.store.TopicMessageStore#resetBatching(java.lang.String, java.lang.String)
*/
public synchronized void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch){
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
if(topicSubContainer!=null){
topicSubContainer.reset();
if(nextToDispatch!=null){
StoreEntry entry=topicSubContainer.getListContainer().getFirst();
do{
ConsumerMessageRef consumerRef=(ConsumerMessageRef)topicSubContainer.getListContainer().get(entry);
Object msg=messageContainer.get(consumerRef.getMessageEntry());
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
if(msg.toString().equals(nextToDispatch.toString())){
// need to set the entry to the previous one
// to ensure we start in the right place
topicSubContainer
.setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
break;
}
}else{
Message message=(Message)msg;
if(message!=null&&message.getMessageId().equals(nextToDispatch)){
// need to set the entry to the previous one
// to ensure we start in the right place
topicSubContainer
.setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
break;
}
}
}
entry=topicSubContainer.getListContainer().getNext(entry);
}while(entry!=null);
}
}
}
/**
* @param clientId
* @param subscriptionName
* @param id
* @return next messageId
* @throws IOException
* @see org.apache.activemq.store.TopicMessageStore#getNextMessageIdToDeliver(java.lang.String, java.lang.String,
* org.apache.activemq.command.MessageId)
*/
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
// TODO Auto-generated method stub
return null;
}
/**
* @param clientId
* @param subscriptionName
* @param id
* @return previous messageId
* @throws IOException
* @see org.apache.activemq.store.TopicMessageStore#getPreviousMessageIdToDeliver(java.lang.String,
* java.lang.String, org.apache.activemq.command.MessageId)
*/
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id)
throws IOException{
// TODO Auto-generated method stub
return null;
}
}

View File

@ -24,7 +24,7 @@ import org.apache.activemq.kaha.StoreEntry;
public class TopicSubAck{
private int count =0;
private StoreEntry storeEntry;
private StoreEntry messageEntry;
/**
* @return the count
@ -56,18 +56,18 @@ public class TopicSubAck{
/**
* @return the storeEntry
* @return the messageEntry
*/
public StoreEntry getStoreEntry(){
return this.storeEntry;
public StoreEntry getMessageEntry(){
return this.messageEntry;
}
/**
* @param storeEntry the storeEntry to set
* @param messageEntry the messageEntry to set
*/
public void setStoreEntry(StoreEntry storeEntry){
this.storeEntry=storeEntry;
public void setMessageEntry(StoreEntry storeEntry){
this.messageEntry=storeEntry;
}

View File

@ -34,7 +34,7 @@ public class TopicSubAckMarshaller implements Marshaller{
public void writePayload(Object object,DataOutput dataOut) throws IOException{
TopicSubAck tsa = (TopicSubAck) object;
dataOut.writeInt(tsa.getCount());
IndexItem item = (IndexItem)tsa.getStoreEntry();
IndexItem item = (IndexItem)tsa.getMessageEntry();
dataOut.writeLong(item.getOffset());
item.write(dataOut);
@ -47,7 +47,7 @@ public class TopicSubAckMarshaller implements Marshaller{
IndexItem item = new IndexItem();
item.setOffset(dataIn.readLong());
item.read(dataIn);
tsa.setStoreEntry(item);
tsa.setMessageEntry(item);
return tsa;
}
}

View File

@ -0,0 +1,65 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.StoreEntry;
/**
* Holds information for the subscriber
*
* @version $Revision: 1.10 $
*/
class TopicSubContainer{
private ListContainer listContainer;
private StoreEntry batchEntry;
TopicSubContainer(ListContainer container){
this.listContainer = container;
}
/**
* @return the batchEntry
*/
StoreEntry getBatchEntry(){
return this.batchEntry;
}
/**
* @param batchEntry the batchEntry to set
*/
void setBatchEntry(StoreEntry batchEntry){
this.batchEntry=batchEntry;
}
/**
* @return the listContainer
*/
ListContainer getListContainer(){
return this.listContainer;
}
/**
* @param listContainer the listContainer to set
*/
void setListContainer(ListContainer container){
this.listContainer=container;
}
void reset() {
batchEntry = null;
}
}

View File

@ -156,14 +156,34 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
// the message table is a synchronizedMap - so just have to synchronize here
boolean matchFound = false;
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
if(entry.getKey().equals(lastAck)){
return (Message) entry.getValue();
if(!matchFound && entry.getKey().equals(id)){
matchFound = true;
}else if (matchFound) {
Message msg = (Message) entry.getValue();
return msg.getMessageId();
}
}
}
return null;
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
// the message table is a synchronizedMap - so just have to synchronize here
Message last= null;
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
if(entry.getKey().equals(id)){
return last != null ? last.getMessageId() : null;
}else {
last = (Message)entry.getValue();
}
}
}
@ -186,4 +206,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
return result;
}
public void resetBatching(String clientId,String subscriptionName,MessageId id) {
}
}

View File

@ -1,20 +1,17 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.rapid;
import java.io.IOException;
@ -23,7 +20,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
@ -42,7 +38,6 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
@ -51,76 +46,73 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
*
* @version $Revision: 1.13 $
*/
public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore {
private static final Log log = LogFactory.getLog(RapidTopicMessageStore.class);
public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore{
private HashMap ackedLastAckLocations = new HashMap();
private static final Log log=LogFactory.getLog(RapidTopicMessageStore.class);
private HashMap ackedLastAckLocations=new HashMap();
private final MapContainer subscriberContainer;
private final MapContainer ackContainer;
private final Store store;
private Map subscriberAcks=new ConcurrentHashMap();
public RapidTopicMessageStore(RapidPersistenceAdapter adapter, ActiveMQTopic destination, MapContainer messageContainer, MapContainer subsContainer, MapContainer ackContainer) throws IOException {
super(adapter, destination, messageContainer);
this.subscriberContainer = subsContainer;
this.ackContainer = ackContainer;
public RapidTopicMessageStore(RapidPersistenceAdapter adapter,ActiveMQTopic destination,
MapContainer messageContainer,MapContainer subsContainer,MapContainer ackContainer) throws IOException{
super(adapter,destination,messageContainer);
this.subscriberContainer=subsContainer;
this.ackContainer=ackContainer;
this.store=adapter.getStore();
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
Object key=i.next();
addSubscriberAckContainer(key);
}
}
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
public void recoverSubscription(String clientId,String subscriptionName,final MessageRecoveryListener listener)
throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
ListContainer list=(ListContainer)subscriberAcks.get(key);
if(list!=null){
for(Iterator i=list.iterator();i.hasNext();){
Object msg=messageContainer.get(i.next());
if(msg!=null){
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message) msg);
listener.recoverMessage((Message)msg);
}
}
listener.finished();
}
} else {
}else{
listener.finished();
}
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
MessageRecoveryListener listener) throws Exception{
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
ListContainer list=(ListContainer)subscriberAcks.get(key);
if(list!=null){
boolean startFound=false;
int count = 0;
for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
int count=0;
for(Iterator i=list.iterator();i.hasNext()&&count<maxReturned;){
Object msg=messageContainer.get(i.next());
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
if (startFound || lastMessageId == null){
if(startFound||lastMessageId==null){
listener.recoverMessageReference(ref);
count++;
}
else if(startFound||ref.equals(lastMessageId.toString())){
}else if(!startFound||ref.equals(lastMessageId.toString())){
startFound=true;
}
}else{
Message message=(Message) msg;
if(startFound||message.getMessageId().equals(lastMessageId)){
startFound=true;
}else{
Message message=(Message)msg;
if(startFound||lastMessageId==null){
listener.recoverMessage(message);
count++;
}else if(!startFound&&message.getMessageId().equals(lastMessageId)){
startFound=true;
}
}
}
@ -131,11 +123,12 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
}
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
@ -163,148 +156,139 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
super.addMessage(context,message);
}
}
/**
*/
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
final boolean debug = log.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
throws IOException{
final boolean debug=log.isDebugEnabled();
JournalTopicAck ack=new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
if( !context.isInTransaction() ) {
if( debug )
ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
final RecordLocation location=peristenceAdapter.writeCommand(ack,false);
final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
if(!context.isInTransaction()){
if(debug)
log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
acknowledge(messageId, location, key);
} else {
if( debug )
acknowledge(messageId,location,key);
}else{
if(debug)
log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
synchronized (this) {
synchronized(this){
inFlightTxLocations.add(location);
}
transactionStore.acknowledge(this, ack, location);
transactionStore.acknowledge(this,ack,location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception {
if( debug )
public void afterCommit() throws Exception{
if(debug)
log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
synchronized (RapidTopicMessageStore.this) {
synchronized(RapidTopicMessageStore.this){
inFlightTxLocations.remove(location);
acknowledge(messageId, location, key);
acknowledge(messageId,location,key);
}
}
public void afterRollback() throws Exception {
if( debug )
public void afterRollback() throws Exception{
if(debug)
log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
synchronized (RapidTopicMessageStore.this) {
synchronized(RapidTopicMessageStore.this){
inFlightTxLocations.remove(location);
}
}
});
}
}
public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
try {
synchronized(this) {
public void replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,MessageId messageId){
try{
synchronized(this){
String subcriberId=getSubscriptionKey(clientId,subscritionName);
String id=messageId.toString();
ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
if(container!=null){
//container.remove(id);
// container.remove(id);
container.removeFirst();
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
} else {
}else{
// no more references to message messageContainer so remove it
messageContainer.remove(messageId.toString());
}
}
}
}
}
catch (Throwable e) {
log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
}catch(Throwable e){
log.debug("Could not replay acknowledge for message '"+messageId
+"'. Message may have already been acknowledged. reason: "+e);
}
}
/**
* @param messageId
* @param location
* @param key
*/
private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
synchronized(this) {
lastLocation = location;
ackedLastAckLocations.put(key, messageId);
private void acknowledge(MessageId messageId,RecordLocation location,SubscriptionKey key){
synchronized(this){
lastLocation=location;
ackedLastAckLocations.put(key,messageId);
String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName());
String id=messageId.toString();
ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
if(container!=null){
//container.remove(id);
// container.remove(id);
container.removeFirst();
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
} else {
}else{
// no more references to message messageContainer so remove it
messageContainer.remove(messageId.toString());
}
}
}
}
}
}
protected String getSubscriptionKey(String clientId,String subscriberName){
String result=clientId+":";
result+=subscriberName!=null?subscriberName:"NOT_SET";
return result;
}
public RecordLocation checkpoint() throws IOException {
ArrayList cpAckedLastAckLocations;
public RecordLocation checkpoint() throws IOException{
ArrayList cpAckedLastAckLocations;
// swap out the hash maps..
synchronized (this) {
cpAckedLastAckLocations = new ArrayList(this.ackedLastAckLocations.values());
this.ackedLastAckLocations = new HashMap();
synchronized(this){
cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values());
this.ackedLastAckLocations=new HashMap();
}
RecordLocation rc = super.checkpoint();
if(!cpAckedLastAckLocations.isEmpty()) {
RecordLocation rc=super.checkpoint();
if(!cpAckedLastAckLocations.isEmpty()){
Collections.sort(cpAckedLastAckLocations);
RecordLocation t = (RecordLocation) cpAckedLastAckLocations.get(0);
if( rc == null || t.compareTo(rc)<0 ) {
rc = t;
RecordLocation t=(RecordLocation)cpAckedLastAckLocations.get(0);
if(rc==null||t.compareTo(rc)<0){
rc=t;
}
}
return rc;
}
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
public void deleteSubscription(String clientId,String subscriptionName) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
subscriberContainer.remove(key);
ListContainer list=(ListContainer) subscriberAcks.get(key);
ListContainer list=(ListContainer)subscriberAcks.get(key);
for(Iterator i=list.iterator();i.hasNext();){
String id=i.next().toString();
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
@ -316,30 +300,63 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
}
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[]) subscriberContainer.values().toArray(
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[])subscriberContainer.values().toArray(
new SubscriptionInfo[subscriberContainer.size()]);
}
protected void addSubscriberAckContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key,"topic-subs");
ListContainer container=store.getListContainer(key,"durable-subs");
Marshaller marshaller=new StringMarshaller();
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
}
public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
throws IOException{
MessageId result=null;
boolean getNext=false;
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
Iterator iter = list.iterator();
return (Message) (iter.hasNext() ? iter.next() : null);
ListContainer list=(ListContainer)subscriberAcks.get(key);
Iterator iter=list.iterator();
for(Iterator i=list.iterator();i.hasNext();){
String id=i.next().toString();
if(id.equals(messageId.toString())){
getNext=true;
}else if(getNext){
result=new MessageId(id);
break;
}
}
return result;
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
throws IOException{
MessageId result=null;
String previousId=null;
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer)subscriberAcks.get(key);
Iterator iter=list.iterator();
for(Iterator i=list.iterator();i.hasNext();){
String id=i.next().toString();
if(id.equals(messageId.toString())){
if(previousId!=null){
result=new MessageId(previousId);
}
break;
}
previousId=id;
}
return result;
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
ListContainer list=(ListContainer)subscriberAcks.get(key);
return list.size();
}
public void resetBatching(String clientId,String subscriptionName,MessageId nextId){
}
}

View File

@ -507,7 +507,7 @@ public class JMSConsumerTest extends JmsTestSupport {
session.commit();
// Only pick up the first message.
Message message1 = message1 = consumer.receive(1000);
Message message1 = consumer.receive(1000);
assertNotNull(message1);
// Don't acknowledge yet. This should keep our prefetch full.

View File

@ -0,0 +1,215 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
/**
* @version $Revision: 1.3 $
*/
public class CursorDurableTest extends TestCase{
protected static final Log log = LogFactory.getLog(CursorDurableTest.class);
protected static final int MESSAGE_COUNT=50;
protected static final int PREFETCH_SIZE = 5;
protected BrokerService broker;
protected String bindAddress="tcp://localhost:60706";
protected int topicCount=0;
public void testSendFirstThenConsume() throws Exception{
ConnectionFactory factory=createConnectionFactory();
Connection consumerConnection= getConsumerConnection(factory);
//create durable subs
MessageConsumer consumer = getConsumer(consumerConnection);
consumerConnection.close();
Connection producerConnection = factory.createConnection();
producerConnection.start();
Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(getTopic(session));
List senderList = new ArrayList();
for (int i =0; i < MESSAGE_COUNT; i++) {
Message msg=session.createTextMessage("test"+i);
senderList.add(msg);
producer.send(msg);
}
producerConnection.close();
//now consume the messages
consumerConnection= getConsumerConnection(factory);
//create durable subs
consumer = getConsumer(consumerConnection);
List consumerList = new ArrayList();
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message msg = consumer.receive();
consumerList.add(msg);
}
assertEquals(senderList,consumerList);
consumerConnection.close();
}
public void testSendWhilstConsume() throws Exception{
ConnectionFactory factory=createConnectionFactory();
Connection consumerConnection= getConsumerConnection(factory);
//create durable subs
MessageConsumer consumer = getConsumer(consumerConnection);
consumerConnection.close();
Connection producerConnection = factory.createConnection();
producerConnection.start();
Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(getTopic(session));
List senderList = new ArrayList();
for (int i =0; i < MESSAGE_COUNT/10; i++) {
TextMessage msg=session.createTextMessage("test"+i);
senderList.add(msg);
producer.send(msg);
}
//now consume the messages
consumerConnection= getConsumerConnection(factory);
//create durable subs
consumer = getConsumer(consumerConnection);
final List consumerList = new ArrayList();
final CountDownLatch latch = new CountDownLatch(1);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg){
try{
//sleep to act as a slow consumer
//which will force a mix of direct and polled dispatching
//using the cursor on the broker
Thread.sleep(50);
}catch(Exception e){
// TODO Auto-generated catch block
e.printStackTrace();
}
consumerList.add(msg);
if (consumerList.size()==MESSAGE_COUNT) {
latch.countDown();
}
}
});
for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) {
TextMessage msg=session.createTextMessage("test"+i);
senderList.add(msg);
producer.send(msg);
}
latch.await(300000,TimeUnit.MILLISECONDS);
assertEquals("Still dipatching - count down latch not sprung" , latch.getCount(),0);
assertEquals("cosumerList - expected: " + MESSAGE_COUNT + " but was: " + consumerList.size(),consumerList.size(),senderList.size());
assertEquals(senderList,consumerList);
producerConnection.close();
consumerConnection.close();
}
protected Topic getTopic(Session session) throws JMSException{
String topicName=getClass().getName();
return session.createTopic(topicName);
}
protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{
Connection connection=fac.createConnection();
connection.setClientID("testConsumer");
connection.start();
return connection;
}
protected MessageConsumer getConsumer(Connection connection) throws Exception{
Session consumerSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = getTopic(consumerSession);
MessageConsumer consumer = consumerSession.createDurableSubscriber(topic,"testConsumer");
return consumer;
}
protected void setUp() throws Exception{
if(broker==null){
broker=createBroker();
}
super.setUp();
}
protected void tearDown() throws Exception{
super.tearDown();
if(broker!=null){
broker.stop();
}
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress);
Properties props = new Properties();
props.setProperty("prefetchPolicy.durableTopicPrefetch","" + PREFETCH_SIZE);
props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch","" + PREFETCH_SIZE);
cf.setProperties(props);
return cf;
}
protected BrokerService createBroker() throws Exception{
BrokerService answer=new BrokerService();
configureBroker(answer);
answer.setDeleteAllMessagesOnStartup(true);
answer.start();
return answer;
}
protected void configureBroker(BrokerService answer) throws Exception{
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
}

View File

@ -0,0 +1,40 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.3 $
*/
public class KahaCursorDurableTest extends CursorDurableTest{
protected static final Log log = LogFactory.getLog(KahaCursorDurableTest.class);
protected void configureBroker(BrokerService answer) throws Exception{
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
answer.setPersistenceAdapter(adaptor);
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
}