Adding persitent cursor support for Queues

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@476101 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-11-17 10:33:57 +00:00
parent 3b28c7c3ec
commit 629bc81fb2
25 changed files with 975 additions and 524 deletions

View File

@ -0,0 +1,142 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.LinkedList;
import javax.jms.JMSException;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* perist pending messages pending message (messages awaiting disptach to a
* consumer) cursor
*
* @version $Revision: 474985 $
*/
class QueueStorePrefetch extends AbstractPendingMessageCursor implements
MessageRecoveryListener {
static private final Log log=LogFactory.getLog(QueueStorePrefetch.class);
private MessageStore store;
private final LinkedList batchList=new LinkedList();
private Destination regionDestination;
/**
* @param topic
* @param clientId
* @param subscriberName
* @throws IOException
*/
public QueueStorePrefetch(Queue queue){
this.regionDestination = queue;
this.store=(MessageStore)queue.getMessageStore();
}
public void start() throws Exception{
}
public void stop() throws Exception{
store.resetBatching();
}
/**
* @return true if there are no pending messages
*/
public boolean isEmpty(){
return batchList.isEmpty();
}
public synchronized int size(){
try {
return store.getMessageCount();
}catch(IOException e) {
log.error("Failed to get message count",e);
throw new RuntimeException(e);
}
}
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){
node.decrementReferenceCount();
}
}
public synchronized boolean hasNext(){
if(isEmpty()){
try{
fillBatch();
}catch(Exception e){
log.error("Failed to fill batch",e);
throw new RuntimeException(e);
}
}
return !isEmpty();
}
public synchronized MessageReference next(){
Message result = (Message)batchList.removeFirst();
result.setRegionDestination(regionDestination);
return result;
}
public void reset(){
}
// MessageRecoveryListener implementation
public void finished(){
}
public void recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
batchList.addLast(message);
}
public void recoverMessageReference(String messageReference)
throws Exception{
// shouldn't get called
throw new RuntimeException("Not supported");
}
// implementation
protected void fillBatch() throws Exception{
store.recoverNextMessages(maxBatchSize,this);
// this will add more messages to the batch list
if(!batchList.isEmpty()){
Message message=(Message)batchList.getLast();
}
}
public String toString() {
return "QueueStorePrefetch" + System.identityHashCode(this) ;
}
}

View File

@ -27,7 +27,6 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
@ -142,12 +141,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
if(tsp!=null){
tsp.addMessageLast(node);
if(started){
// if the store has been empty - then this message is next to dispatch
if((pendingCount-nonPersistent.size())<=0){
tsp.nextToDispatch(node.getMessageId());
}
}
}
}
}
@ -190,6 +183,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
}
public synchronized void reset(){
nonPersistent.reset();
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
tsp.reset();
@ -199,21 +193,27 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
public int size(){
return pendingCount;
}
public synchronized void setMaxBatchSize(int maxBatchSize){
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
tsp.setMaxBatchSize(maxBatchSize);
}
super.setMaxBatchSize(maxBatchSize);
}
protected synchronized PendingMessageCursor getNextCursor() throws Exception{
if(currentCursor==null||currentCursor.isEmpty()){
currentCursor=null;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
tsp.setMaxBatchSize(getMaxBatchSize());
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
if(tsp.hasNext()){
currentCursor=tsp;
break;
}
}
// round-robin
Object obj=storePrefetches.removeFirst();
storePrefetches.addLast(obj);
storePrefetches.addLast(storePrefetches.removeFirst());
}
return currentCursor;
}

View File

@ -0,0 +1,170 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.util.Iterator;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Store based Cursor for Queues
*
* @version $Revision: 474985 $
*/
public class StoreQueueCursor extends AbstractPendingMessageCursor{
static private final Log log=LogFactory.getLog(StoreQueueCursor.class);
private int pendingCount=0;
private Queue queue;
private Store tmpStore;
private PendingMessageCursor nonPersistent;
private QueueStorePrefetch persistent;
private boolean started;
private PendingMessageCursor currentCursor;
/**
* Construct
*
* @param queue
* @param tmpStore
*/
public StoreQueueCursor(Queue queue,Store tmpStore){
this.queue=queue;
this.tmpStore=tmpStore;
this.persistent=new QueueStorePrefetch(queue);
}
public synchronized void start() throws Exception{
started=true;
if(nonPersistent==null){
nonPersistent=new FilePendingMessageCursor(queue.getDestination(),tmpStore);
nonPersistent.setMaxBatchSize(getMaxBatchSize());
}
nonPersistent.start();
pendingCount=persistent.size();
}
public synchronized void stop() throws Exception{
started=false;
if(nonPersistent!=null){
nonPersistent.stop();
}
pendingCount=0;
}
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){
Message msg=node.getMessage();
if(started){
pendingCount++;
if(!msg.isPersistent()){
nonPersistent.addMessageLast(node);
}
}
if(msg.isPersistent()){
persistent.addMessageLast(node);
}
}
}
public void clear(){
pendingCount=0;
}
public synchronized boolean hasNext(){
boolean result=pendingCount>0;
if(result){
try{
currentCursor=getNextCursor();
}catch(Exception e){
log.error("Failed to get current cursor ",e);
throw new RuntimeException(e);
}
result=currentCursor!=null?currentCursor.hasNext():false;
}
return result;
}
public synchronized MessageReference next(){
return currentCursor!=null?currentCursor.next():null;
}
public synchronized void remove(){
if(currentCursor!=null){
currentCursor.remove();
}
pendingCount--;
}
public void remove(MessageReference node){
pendingCount--;
}
public synchronized void reset(){
nonPersistent.reset();
}
public int size(){
return pendingCount;
}
public synchronized boolean isEmpty(){
return pendingCount<=0;
}
/**
* Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
* may do
*
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired(){
return false;
}
/**
* @return the nonPersistent Cursor
*/
public PendingMessageCursor getNonPersistent(){
return this.nonPersistent;
}
/**
* @param nonPersistent cursor to set
*/
public void setNonPersistent(PendingMessageCursor nonPersistent){
this.nonPersistent=nonPersistent;
}
public void setMaxBatchSize(int maxBatchSize){
persistent.setMaxBatchSize(maxBatchSize);
if(nonPersistent!=null){
nonPersistent.setMaxBatchSize(maxBatchSize);
}
super.setMaxBatchSize(maxBatchSize);
}
protected synchronized PendingMessageCursor getNextCursor() throws Exception{
if(currentCursor==null||currentCursor.isEmpty()){
currentCursor = currentCursor == persistent ? nonPersistent : persistent;
}
return currentCursor;
}
}

View File

@ -46,7 +46,6 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
private final LinkedList batchList=new LinkedList();
private String clientId;
private String subscriberName;
private MessageId lastMessageId;
private Destination regionDestination;
/**
@ -66,7 +65,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
}
public void stop() throws Exception{
store.resetBatching(clientId,clientId,null);
store.resetBatching(clientId,subscriberName);
}
/**
@ -130,12 +129,12 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
// implementation
protected void fillBatch() throws Exception{
store.recoverNextMessages(clientId,subscriberName,lastMessageId,
store.recoverNextMessages(clientId,subscriberName,
maxBatchSize,this);
// this will add more messages to the batch list
if(!batchList.isEmpty()){
Message message=(Message)batchList.getLast();
lastMessageId=message.getMessageId();
}
}
@ -143,8 +142,4 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")";
}
synchronized void nextToDispatch(MessageId id) throws Exception {
lastMessageId = store.getPreviousMessageIdToDeliver(clientId,clientId,id);
store.resetBatching(clientId,clientId,id);
}
}

View File

@ -1,24 +1,20 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store;
import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -28,70 +24,84 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
/**
* Represents a message store which is used by the persistent {@link org.apache.activemq.service.MessageContainer}
* Represents a message store which is used by the persistent
* implementations
*
*
* @version $Revision: 1.5 $
*/
public interface MessageStore extends Service {
public interface MessageStore extends Service{
/**
* Adds a message to the message store
* @param context TODO
*
* @param context context
* @param message
* @throws IOException
*/
public void addMessage(ConnectionContext context, Message message) throws IOException;
public void addMessage(ConnectionContext context,Message message) throws IOException;
/**
* Adds a message reference to the message store
* @param context TODO
* @param messageId TODO
* @param expirationTime TODO
*
* @param context
* @param messageId
* @param expirationTime
* @param messageRef
* @throws IOException
*/
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException;
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException;
/**
* Looks up a message using either the String messageID or
* the messageNumber. Implementations are encouraged to fill in the missing
* key if its easy to do so.
* Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
* in the missing key if its easy to do so.
*
* @param identity which contains either the messageID or the messageNumber
* @return the message or null if it does not exist
* @throws IOException
*/
public Message getMessage(MessageId identity) throws IOException;
/**
* Looks up a message using either the String messageID or
* the messageNumber. Implementations are encouraged to fill in the missing
* key if its easy to do so.
* Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
* in the missing key if its easy to do so.
*
* @param identity which contains either the messageID or the messageNumber
* @return the message or null if it does not exist
* @throws IOException
*/
public String getMessageReference(MessageId identity) throws IOException;
/**
* Removes a message from the message store.
* @param context TODO
* @param ack the ack request that cause the message to be removed. It conatins
* the identity which contains the messageID of the message that needs to be removed.
*
* @param context
* @param ack the ack request that cause the message to be removed. It conatins the identity which contains the
* messageID of the message that needs to be removed.
* @throws IOException
*/
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException;
/**
* Removes all the messages from the message store.
* @param context TODO
*
* @param context
* @throws IOException
*/
public void removeAllMessages(ConnectionContext context) throws IOException;
/**
* Recover any messages to be delivered.
*
*
* @param container
* @throws Exception
* @throws Exception
*/
public void recover(MessageRecoveryListener container) throws Exception;
/**
* The destination that the message store is holding messages for.
* @return
*
* @return the destination
*/
public ActiveMQDestination getDestination();
@ -100,4 +110,23 @@ public interface MessageStore extends Service {
*/
public void setUsageManager(UsageManager usageManager);
/**
* @return the number of messages ready to deliver
* @throws IOException
*
*/
public int getMessageCount() throws IOException;
/**
* A hint to the Store to reset any batching state for the Destination
*
* @param nextToDispatch
*
*/
public void resetBatching();
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener)
throws Exception;
}

View File

@ -104,6 +104,4 @@ public interface PersistenceAdapter extends Service {
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager);
}

View File

@ -77,4 +77,20 @@ public class ProxyMessageStore implements MessageStore {
public void setUsageManager(UsageManager usageManager) {
delegate.setUsageManager(usageManager);
}
public int getMessageCount() throws IOException{
return delegate.getMessageCount();
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
delegate.recoverNextMessages(maxReturned,listener);
}
public void resetBatching(){
delegate.resetBatching();
}
}

View File

@ -82,22 +82,15 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
delegate.recoverSubscription(clientId, subscriptionName, listener);
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
delegate.recoverNextMessages(clientId, subscriptionName, maxReturned,listener);
}
public void resetBatching(String clientId,String subscriptionName,MessageId id) {
delegate.resetBatching(clientId,subscriptionName,id);
}
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
return delegate.getNextMessageIdToDeliver(clientId,subscriptionName,id);
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
return delegate.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
public void resetBatching(String clientId,String subscriptionName) {
delegate.resetBatching(clientId,subscriptionName);
}
public ActiveMQDestination getDestination() {
return delegate.getDestination();
}
@ -120,4 +113,19 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
public int getMessageCount(String clientId,String subscriberName) throws IOException{
return delegate.getMessageCount(clientId,subscriberName);
}
public int getMessageCount() throws IOException{
return delegate.getMessageCount();
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
delegate.recoverNextMessages(maxReturned,listener);
}
public void resetBatching(){
delegate.resetBatching();
}
}

View File

@ -69,46 +69,21 @@ public interface TopicMessageStore extends MessageStore{
*
* @param clientId
* @param subscriptionName
* @param lastMessageId
* @param maxReturned
* @param listener
*
* @throws Exception
*/
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception;
/**
* A hint to the Store to reset any batching state for a durable subsriber
* @param clientId
* @param subscriptionName
* @param nextToDispatch
*
*/
public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch);
/**
* Get the next messageId to deliver to a subscriber after the MessageId provided
* @param clientId
* @param subscriptionName
* @param id
* @return the next messageId or null
* @throws IOException
* @throws Exception
*/
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
/**
* Get the previous messageId to deliver to a subscriber before the MessageId provided
* @param clientId
* @param subscriptionName
* @param id
* @return the next messageId or null
* @throws IOException
* @throws Exception
*/
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
public void resetBatching(String clientId,String subscriptionName);
/**

View File

@ -89,4 +89,9 @@ public interface JDBCAdapter{
public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
}
public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,
JDBCMessageRecoveryListener listener) throws Exception;
}

View File

@ -19,7 +19,7 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -33,6 +33,7 @@ import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* @version $Revision: 1.10 $
*/
@ -42,6 +43,7 @@ public class JDBCMessageStore implements MessageStore {
protected final ActiveMQDestination destination;
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastMessageId = new AtomicLong(-1);
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
ActiveMQDestination destination) {
@ -201,4 +203,67 @@ public class JDBCMessageStore implements MessageStore {
public void setUsageManager(UsageManager usageManager) {
// we can ignore since we don't buffer up messages.
}
public int getMessageCount() throws IOException{
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.doGetMessageCount(c, destination);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
} finally {
c.close();
}
return result;
}
/**
* @param maxReturned
* @param listener
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, org.apache.activemq.store.MessageRecoveryListener)
*/
public void recoverNextMessages(int maxReturned,final MessageRecoveryListener listener) throws Exception{
TransactionContext c=persistenceAdapter.getTransactionContext();
try{
adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned,
new JDBCMessageRecoveryListener(){
public void recoverMessage(long sequenceId,byte[] data) throws Exception{
Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
lastMessageId.set(sequenceId);
}
public void recoverMessageReference(String reference) throws Exception{
listener.recoverMessageReference(reference);
}
public void finished(){
listener.finished();
}
});
}catch(SQLException e){
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
}finally{
c.close();
}
}
/**
*
* @see org.apache.activemq.store.MessageStore#resetBatching()
*/
public void resetBatching(){
lastMessageId.set(-1);
}
}

View File

@ -19,7 +19,9 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
@ -30,13 +32,14 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @version $Revision: 1.6 $
*/
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
private Map subscriberLastMessageMap=new ConcurrentHashMap();
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
ActiveMQTopic topic) {
super(persistenceAdapter, adapter, wireFormat, topic);
@ -90,35 +93,46 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
public void recoverNextMessages(final String clientId,final String subscriptionName, final MessageId lastMessageId,final int maxReturned,final MessageRecoveryListener listener) throws Exception{
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
long lastSequence = lastMessageId != null ? lastMessageId.getBrokerSequenceId() : -1;
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,lastSequence,maxReturned,
new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
public synchronized void recoverNextMessages(final String clientId,final String subscriptionName,
final int maxReturned,final MessageRecoveryListener listener) throws Exception{
TransactionContext c=persistenceAdapter.getTransactionContext();
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
AtomicLong last=(AtomicLong)subscriberLastMessageMap.get(subcriberId);
if(last==null){
last=new AtomicLong(-1);
subscriberLastMessageMap.put(subcriberId,last);
}
final AtomicLong finalLast=last;
try{
adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned,
new JDBCMessageRecoveryListener(){
public void recoverMessage(long sequenceId,byte[] data) throws Exception{
Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
finalLast.set(sequenceId);
}
public void recoverMessageReference(String reference) throws Exception {
public void recoverMessageReference(String reference) throws Exception{
listener.recoverMessageReference(reference);
}
public void finished(){
listener.finished();
}
});
} catch (SQLException e) {
}catch(SQLException e){
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
} finally {
}finally{
c.close();
last.set(finalLast.get());
}
}
public void resetBatching(String clientId,String subscriptionName,MessageId id) {
public void resetBatching(String clientId,String subscriptionName) {
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
subscriberLastMessageMap.remove(subcriberId);
}
/**
@ -165,6 +179,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
resetBatching(clientId,subscriptionName);
}
}
@ -180,76 +195,9 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
final MessageId result = new MessageId();
final AtomicBoolean initalized = new AtomicBoolean();
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
long sequence = id != null ? id.getBrokerSequenceId() : -1;
adapter.doGetNextDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
initalized.set(true);
}
public void recoverMessageReference(String reference) throws Exception {
result.setValue(reference);
initalized.set(true);
}
public void finished(){
}
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
return initalized.get () ? result : null;
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
final MessageId result = new MessageId();
final AtomicBoolean initalized = new AtomicBoolean();
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
long sequence = id != null ? id.getBrokerSequenceId() : -1;
adapter.doGetPrevDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
public void recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
result.setProducerId(msg.getMessageId().getProducerId());
result.setProducerSequenceId(msg.getMessageId().getProducerSequenceId());
result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
initalized.set(true);
}
public void recoverMessageReference(String reference) throws Exception {
result.setValue(reference);
initalized.set(true);
}
public void finished(){
}
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
return initalized.get () ? result : null;
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
int result = 0;
@ -265,6 +213,12 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
return result;
}
protected String getSubscriptionKey(String clientId,String subscriberName){
String result=clientId+":";
result+=subscriberName!=null?subscriberName:"NOT_SET";
return result;
}

View File

@ -66,6 +66,8 @@ public class Statements {
private String durableSubscriberMessageCountStatement;
private String nextDurableSubscriberMessageIdStatement;
private String prevDurableSubscriberMessageIdStatement;
private String destinationMessageCountStatement;
private String findNextMessagesStatement;
private boolean useLockCreateWhereClause;
public String[] getCreateSchemaStatements() {
@ -338,6 +340,29 @@ public class Statements {
}
return lockUpdateStatement;
}
/**
* @return the destinationMessageCountStatement
*/
public String getDestinationMessageCountStatement(){
if (destinationMessageCountStatement==null) {
destinationMessageCountStatement= "SELECT COUNT(*) FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=?";
}
return destinationMessageCountStatement;
}
/**
* @return the findNextMessagesStatement
*/
public String getFindNextMessagesStatement(){
if(findNextMessagesStatement == null) {
findNextMessagesStatement="SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
}
return findNextMessagesStatement;
}
public String getFullMessageTableName() {
return getTablePrefix() + getMessageTableName();
@ -627,4 +652,22 @@ public class Statements {
public void setPrevDurableSubscriberMessageIdStatement(String prevDurableSubscriberMessageIdStatement){
this.prevDurableSubscriberMessageIdStatement=prevDurableSubscriberMessageIdStatement;
}
}
/**
* @param findNextMessagesStatement the findNextMessagesStatement to set
*/
public void setFindNextMessagesStatement(String findNextMessagesStatement){
this.findNextMessagesStatement=findNextMessagesStatement;
}
/**
* @param destinationMessageCountStatement the destinationMessageCountStatement to set
*/
public void setDestinationMessageCountStatement(String destinationMessageCountStatement){
this.destinationMessageCountStatement=destinationMessageCountStatement;
}
}

View File

@ -677,6 +677,54 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
close(s);
}
}
public int doGetMessageCount(TransactionContext c,ActiveMQDestination destination) throws SQLException, IOException{
PreparedStatement s=null;
ResultSet rs=null;
int result=0;
try{
s=c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement());
s.setString(1,destination.getQualifiedName());
rs=s.executeQuery();
if(rs.next()){
result=rs.getInt(1);
}
}finally{
close(rs);
close(s);
}
return result;
}
public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
PreparedStatement s=null;
ResultSet rs=null;
try{
s=c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
s.setString(1,destination.getQualifiedName());
s.setLong(4,nextSeq);
rs=s.executeQuery();
int count=0;
if(statements.isUseExternalMessageReferences()){
while(rs.next()&&count<maxReturned){
listener.recoverMessageReference(rs.getString(1));
count++;
}
}else{
while(rs.next()&&count<maxReturned){
listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
count++;
}
}
}finally{
close(rs);
close(s);
listener.finished();
}
}
/*
* Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
* subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
@ -700,4 +748,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
* out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
* try { s.close(); } catch (Throwable ignore) {} } }
*/
}

View File

@ -381,4 +381,27 @@ public class JournalMessageStore implements MessageStore {
throw new IOException("The journal does not support message references.");
}
}
/**
* @return
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
public int getMessageCount() throws IOException{
peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount();
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(maxReturned,listener);
}
public void resetBatching(){
longTermStore.resetBatching();
}
}

View File

@ -58,9 +58,9 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
longTermStore.recoverSubscription(clientId, subscriptionName, listener);
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned,listener);
}
@ -190,25 +190,16 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
return longTermStore.getAllSubscriptions();
}
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId,subscriberName);
}
public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch) {
longTermStore.resetBatching(clientId,subscriptionName,nextToDispatch);
public void resetBatching(String clientId,String subscriptionName) {
longTermStore.resetBatching(clientId,subscriptionName);
}
}
}

View File

@ -415,4 +415,23 @@ public class QuickJournalMessageStore implements MessageStore {
throw new IOException("The journal does not support message references.");
}
}
public int getMessageCount() throws IOException{
peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount();
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(maxReturned,listener);
}
public void resetBatching(){
longTermStore.resetBatching();
}
}

View File

@ -72,9 +72,9 @@ public class QuickJournalTopicMessageStore extends QuickJournalMessageStore impl
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId, int maxReturned,final MessageRecoveryListener listener) throws Exception{
public void recoverNextMessages(String clientId,String subscriptionName, int maxReturned,final MessageRecoveryListener listener) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,new MessageRecoveryListener() {
longTermStore.recoverNextMessages(clientId, subscriptionName,maxReturned,new MessageRecoveryListener() {
public void recoverMessage(Message message) throws Exception {
throw new IOException("Should not get called.");
}
@ -217,26 +217,16 @@ public class QuickJournalTopicMessageStore extends QuickJournalMessageStore impl
return longTermStore.getAllSubscriptions();
}
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId,subscriberName);
}
public void resetBatching(String clientId,String subscriptionName,MessageId nextId) {
longTermStore.resetBatching(clientId,subscriptionName,nextId);
public void resetBatching(String clientId,String subscriptionName) {
longTermStore.resetBatching(clientId,subscriptionName);
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
@ -35,10 +36,12 @@ import org.apache.activemq.util.LRUCache;
*
* @version $Revision: 1.7 $
*/
public class KahaMessageStore implements MessageStore{
public class KahaMessageStore implements MessageStore, UsageListener{
protected final ActiveMQDestination destination;
protected final ListContainer messageContainer;
protected StoreEntry batchEntry = null;
protected final LRUCache cache;
protected UsageManager usageManager;
public KahaMessageStore(ListContainer container,ActiveMQDestination destination, int maximumCacheSize) throws IOException{
this.messageContainer=container;
@ -73,19 +76,19 @@ public class KahaMessageStore implements MessageStore{
public synchronized Message getMessage(MessageId identity) throws IOException{
Message result=null;
StoreEntry entry=(StoreEntry)cache.remove(identity);
StoreEntry entry=(StoreEntry)cache.get(identity);
if(entry!=null){
result = (Message)messageContainer.get(entry);
}else{
for(Iterator i=messageContainer.iterator();i.hasNext();){
Message msg=(Message)i.next();
if(msg.getMessageId().equals(identity)){
result=msg;
break;
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
Message msg=(Message)messageContainer.get(entry);
if(msg.getMessageId().equals(identity)){
result=msg;
cache.put(identity,msg);
break;
}
}
}
}
return result;
}
@ -102,10 +105,10 @@ public class KahaMessageStore implements MessageStore{
if(entry!=null){
messageContainer.remove(entry);
}else{
for(Iterator i=messageContainer.iterator();i.hasNext();){
Message msg=(Message)i.next();
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
Message msg=(Message)messageContainer.get(entry);
if(msg.getMessageId().equals(msgId)){
i.remove();
messageContainer.remove(entry);
break;
}
}
@ -119,9 +122,15 @@ public class KahaMessageStore implements MessageStore{
listener.finished();
}
public void start() {}
public void start() {
if( this.usageManager != null )
this.usageManager.addUsageListener(this);
}
public void stop() {}
public void stop() {
if( this.usageManager != null )
this.usageManager.removeUsageListener(this);
}
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
@ -141,6 +150,91 @@ public class KahaMessageStore implements MessageStore{
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
}
/**
* @return the number of messages held by this destination
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
public int getMessageCount(){
return messageContainer.size();
}
/**
* @param id
* @return null
* @throws Exception
* @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId)
*/
public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception{
return null;
}
/**
* @param lastMessageId
* @param maxReturned
* @param listener
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int, org.apache.activemq.store.MessageRecoveryListener)
*/
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
StoreEntry entry = batchEntry;
if (entry == null) {
entry= messageContainer.getFirst();
}else {
entry=messageContainer.refresh(entry);
entry=messageContainer.getNext(entry);
}
if(entry!=null){
int count = 0;
do{
Object msg=messageContainer.get(entry);
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
listener.recoverMessageReference(ref);
}else{
Message message=(Message)msg;
listener.recoverMessage(message);
}
count++;
}
batchEntry = entry;
entry=messageContainer.getNext(entry);
}while(entry!=null&&count<maxReturned);
}
listener.finished();
}
/**
* @param nextToDispatch
* @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
*/
public void resetBatching(){
batchEntry = null;
}
/**
* @return true if the store supports cursors
*/
public boolean isSupportForCursors() {
return true;
}
/**
* @param memoryManager
* @param oldPercentUsage
* @param newPercentUsage
* @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int, int)
*/
public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if (newPercentUsage == 100) {
cache.clear();
}
}
}

View File

@ -19,6 +19,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -37,7 +38,7 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;
/**
* @org.apache.xbean.XBean
@ -106,7 +107,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination,maximumDestinationCacheSize);
messageStores.put(destination,rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);

View File

@ -17,6 +17,7 @@ package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -31,24 +32,20 @@ import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import java.util.concurrent.ConcurrentHashMap;
/**
* @version $Revision: 1.5 $
*/
public class KahaTopicMessageStore implements TopicMessageStore{
public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{
private ActiveMQDestination destination;
private ListContainer ackContainer;
private ListContainer messageContainer;
private Map subscriberContainer;
private Store store;
private Map subscriberMessages=new ConcurrentHashMap();
public KahaTopicMessageStore(Store store,ListContainer messageContainer,ListContainer ackContainer,
MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
this.messageContainer=messageContainer;
this.destination=destination;
MapContainer subsContainer,ActiveMQDestination destination,int maximumCacheSize) throws IOException{
super(messageContainer,destination,maximumCacheSize);
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
@ -159,7 +156,7 @@ public class KahaTopicMessageStore implements TopicMessageStore{
}
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
@ -195,7 +192,7 @@ public class KahaTopicMessageStore implements TopicMessageStore{
}
public void delete(){
messageContainer.clear();
super.delete();
ackContainer.clear();
subscriberContainer.clear();
}
@ -322,82 +319,16 @@ public class KahaTopicMessageStore implements TopicMessageStore{
}
}
/**
* @param usageManager
* @see org.apache.activemq.store.MessageStore#setUsageManager(org.apache.activemq.memory.UsageManager)
*/
public void setUsageManager(UsageManager usageManager){
// TODO Auto-generated method stub
}
/**
* @throws Exception
* @see org.apache.activemq.Service#start()
*/
public void start() throws Exception{
// TODO Auto-generated method stub
}
/**
* @throws Exception
* @see org.apache.activemq.Service#stop()
*/
public void stop() throws Exception{
// TODO Auto-generated method stub
}
/**
* @param clientId
* @param subscriptionName
* @see org.apache.activemq.store.TopicMessageStore#resetBatching(java.lang.String, java.lang.String)
*/
public synchronized void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch){
public synchronized void resetBatching(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
if(topicSubContainer!=null){
topicSubContainer.reset();
if(nextToDispatch!=null){
StoreEntry entry=topicSubContainer.getListContainer().getFirst();
do{
ConsumerMessageRef consumerRef=(ConsumerMessageRef)topicSubContainer.getListContainer().get(entry);
Object msg=messageContainer.get(consumerRef.getMessageEntry());
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
if(msg.toString().equals(nextToDispatch.toString())){
// need to set the entry to the previous one
// to ensure we start in the right place
topicSubContainer
.setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
break;
}
}else{
Message message=(Message)msg;
if(message!=null&&message.getMessageId().equals(nextToDispatch)){
// need to set the entry to the previous one
// to ensure we start in the right place
topicSubContainer
.setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
break;
}
}
}
entry=topicSubContainer.getListContainer().getNext(entry);
}while(entry!=null);
}
}
}
/**
* @param clientId
* @param subscriptionName
* @param id
* @return next messageId
* @throws IOException
* @see org.apache.activemq.store.TopicMessageStore#getNextMessageIdToDeliver(java.lang.String, java.lang.String,
* org.apache.activemq.command.MessageId)
*/
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName) throws IOException{
// TODO Auto-generated method stub
return null;
}

View File

@ -1,20 +1,17 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
@ -22,7 +19,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -34,82 +30,97 @@ import org.apache.activemq.store.MessageStore;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which uses a
*
*
* @version $Revision: 1.7 $
*/
public class MemoryMessageStore implements MessageStore {
public class MemoryMessageStore implements MessageStore{
protected final ActiveMQDestination destination;
protected final Map messageTable;
public MemoryMessageStore(ActiveMQDestination destination) {
this(destination, new LinkedHashMap());
public MemoryMessageStore(ActiveMQDestination destination){
this(destination,new LinkedHashMap());
}
public MemoryMessageStore(ActiveMQDestination destination, Map messageTable) {
this.destination = destination;
this.messageTable = Collections.synchronizedMap(messageTable);
public MemoryMessageStore(ActiveMQDestination destination,Map messageTable){
this.destination=destination;
this.messageTable=Collections.synchronizedMap(messageTable);
}
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
messageTable.put(message.getMessageId(), message);
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
messageTable.put(messageId, messageRef);
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
messageTable.put(message.getMessageId(),message);
}
public Message getMessage(MessageId identity) throws IOException {
return (Message) messageTable.get(identity);
}
public String getMessageReference(MessageId identity) throws IOException {
return (String) messageTable.get(identity);
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException{
messageTable.put(messageId,messageRef);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
public Message getMessage(MessageId identity) throws IOException{
return (Message)messageTable.get(identity);
}
public String getMessageReference(MessageId identity) throws IOException{
return (String)messageTable.get(identity);
}
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
messageTable.remove(ack.getLastMessageId());
}
public void removeMessage(MessageId msgId) throws IOException {
public void removeMessage(MessageId msgId) throws IOException{
messageTable.remove(msgId);
}
public void recover(MessageRecoveryListener listener) throws Exception {
public void recover(MessageRecoveryListener listener) throws Exception{
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.values().iterator();iter.hasNext();){
Object msg=(Object) iter.next();
Object msg=(Object)iter.next();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message) msg);
listener.recoverMessage((Message)msg);
}
}
listener.finished();
}
}
public void start() {
public void start(){
}
public void stop() {
public void stop(){
}
public void removeAllMessages(ConnectionContext context) throws IOException {
public void removeAllMessages(ConnectionContext context) throws IOException{
messageTable.clear();
}
public ActiveMQDestination getDestination() {
public ActiveMQDestination getDestination(){
return destination;
}
public void delete() {
public void delete(){
messageTable.clear();
}
/**
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
public void setUsageManager(UsageManager usageManager){
}
public int getMessageCount(){
return messageTable.size();
}
public void resetBatching(MessageId nextToDispatch){
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
}
public void resetBatching(){
}
}

View File

@ -1,20 +1,17 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
@ -24,7 +21,6 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -37,72 +33,76 @@ import org.apache.activemq.util.SubscriptionKey;
/**
* @version $Revision: 1.5 $
*/
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{
private Map ackDatabase;
private Map subscriberDatabase;
MessageId lastMessageId;
public MemoryTopicMessageStore(ActiveMQDestination destination) {
this(destination, new LinkedHashMap(), makeMap(), makeMap());
public MemoryTopicMessageStore(ActiveMQDestination destination){
this(destination,new LinkedHashMap(),makeMap(),makeMap());
}
protected static Map makeMap() {
protected static Map makeMap(){
return Collections.synchronizedMap(new HashMap());
}
public MemoryTopicMessageStore(ActiveMQDestination destination, Map messageTable, Map subscriberDatabase, Map ackDatabase) {
super(destination, messageTable);
this.subscriberDatabase = subscriberDatabase;
this.ackDatabase = ackDatabase;
public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase,
Map ackDatabase){
super(destination,messageTable);
this.subscriberDatabase=subscriberDatabase;
this.ackDatabase=ackDatabase;
}
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
lastMessageId = message.getMessageId();
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
super.addMessage(context,message);
lastMessageId=message.getMessageId();
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
ackDatabase.put(new SubscriptionKey(clientId, subscriptionName), messageId);
public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
throws IOException{
ackDatabase.put(new SubscriptionKey(clientId,subscriptionName),messageId);
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return (SubscriptionInfo) subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
SubscriptionInfo info = new SubscriptionInfo();
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
subscriberDatabase.put(key, info);
MessageId l=retroactive ? null : lastMessageId;
if( l!=null ) {
ackDatabase.put(key, l);
SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
subscriberDatabase.put(key,info);
MessageId l=retroactive?null:lastMessageId;
if(l!=null){
ackDatabase.put(key,l);
}
}
public void deleteSubscription(String clientId, String subscriptionName) {
org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
public void deleteSubscription(String clientId,String subscriptionName){
org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
ackDatabase.remove(key);
subscriberDatabase.remove(key);
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
throws Exception{
MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
boolean pastLastAck=lastAck==null;
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
Map.Entry entry=(Entry)iter.next();
if(pastLastAck){
Object msg=entry.getValue();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message) msg);
listener.recoverMessage((Message)msg);
}
}else{
pastLastAck=entry.getKey().equals(lastAck);
@ -111,92 +111,40 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
listener.finished();
}
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
boolean startFound=false;
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
int count = 0;
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() && count < maxReturned;){
Map.Entry entry=(Entry) iter.next();
Object msg=entry.getValue();
if(msg.getClass()==String.class){
String ref=msg.toString();
if(startFound||ref.equals(lastMessageId.toString())){
startFound=true;
}else if (startFound){
listener.recoverMessageReference(ref);
count++;
}
}else{
Message message=(Message) msg;
if(startFound||message.getMessageId().equals(lastMessageId)){
startFound=true;
}else if (startFound){
listener.recoverMessage(message);
count++;
}
}
}
listener.finished();
}
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
MessageRecoveryListener listener) throws Exception{
listener.finished();
}
public void delete() {
public void delete(){
super.delete();
ackDatabase.clear();
subscriberDatabase.clear();
lastMessageId=null;
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
// the message table is a synchronizedMap - so just have to synchronize here
boolean matchFound = false;
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
if(!matchFound && entry.getKey().equals(id)){
matchFound = true;
}else if (matchFound) {
Message msg = (Message) entry.getValue();
return msg.getMessageId();
}
}
}
return null;
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
// the message table is a synchronizedMap - so just have to synchronize here
Message last= null;
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
if(entry.getKey().equals(id)){
return last != null ? last.getMessageId() : null;
}else {
last = (Message)entry.getValue();
}
}
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id)
throws IOException{
return null;
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
int result = 0;
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
int result=0;
MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
result = messageTable.size();
result=messageTable.size();
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
Map.Entry entry=(Entry)iter.next();
if(entry.getKey().equals(lastAck)){
break;
}
@ -205,8 +153,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
return result;
}
public void resetBatching(String clientId,String subscriptionName,MessageId id) {
public void resetBatching(String clientId,String subscriptionName,MessageId id){
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception{
}
public void resetBatching(String clientId,String subscriptionName){
}
}

View File

@ -287,4 +287,16 @@ public class RapidMessageStore implements MessageStore {
}
}
}
public int getMessageCount(){
return 0;
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
}
public void resetBatching(){
}
}

View File

@ -20,6 +20,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
@ -38,8 +40,7 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A MessageStore that uses a Journal to store it's messages.
@ -312,44 +313,6 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
subscriberAcks.put(key,container);
}
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
throws IOException{
MessageId result=null;
boolean getNext=false;
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer)subscriberAcks.get(key);
Iterator iter=list.iterator();
for(Iterator i=list.iterator();i.hasNext();){
String id=i.next().toString();
if(id.equals(messageId.toString())){
getNext=true;
}else if(getNext){
result=new MessageId(id);
break;
}
}
return result;
}
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
throws IOException{
MessageId result=null;
String previousId=null;
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer)subscriberAcks.get(key);
Iterator iter=list.iterator();
for(Iterator i=list.iterator();i.hasNext();){
String id=i.next().toString();
if(id.equals(messageId.toString())){
if(previousId!=null){
result=new MessageId(previousId);
}
break;
}
previousId=id;
}
return result;
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
@ -359,4 +322,16 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
public void resetBatching(String clientId,String subscriptionName,MessageId nextId){
}
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
}
public void resetBatching(String clientId,String subscriptionName){
}
}