diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 1360f6ba7d..87f5df8f55 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -385,7 +385,7 @@ public class BrokerService implements Service, Serializable { startDestinations(); addShutdownHook(); - log.info("Using Persistence Adaptor " + getPersistenceAdapter()); + log.info("Using Persistence Adaptor: " + getPersistenceAdapter()); if (deleteAllMessagesOnStartup) { deleteAllMessages(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index 1464b5e45c..bef30a39a0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -59,7 +59,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ } nonPersistent.start(); persistent.start(); - pendingCount=persistent.size(); + pendingCount=persistent.size() + nonPersistent.size(); } public synchronized void stop() throws Exception{ @@ -87,12 +87,28 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ } } } + + public void addMessageFirst(MessageReference node) throws Exception{ + if(node!=null){ + Message msg=node.getMessage(); + if(started){ + pendingCount++; + if(!msg.isPersistent()){ + nonPersistent.addMessageFirst(node); + } + } + if(msg.isPersistent()){ + persistent.addMessageFirst(node); + } + } + } public void clear(){ pendingCount=0; } public synchronized boolean hasNext(){ + boolean result=pendingCount>0; if(result){ try{ @@ -107,7 +123,8 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ } public synchronized MessageReference next(){ - return currentCursor!=null?currentCursor.next():null; + MessageReference result = currentCursor!=null?currentCursor.next():null; + return result; } public synchronized void remove(){ @@ -118,6 +135,11 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ } public void remove(MessageReference node){ + if (!node.isPersistent()) { + nonPersistent.remove(node); + }else { + persistent.remove(node); + } pendingCount--; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java index b7072847cf..1d5b489231 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java @@ -88,5 +88,9 @@ public class DataSourceSupport { ds.setCreateDatabase("create"); return ds; } + + public String toString(){ + return ""+dataSource; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index bd9772daf8..40bee0f4c3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -482,4 +482,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist protected DatabaseLocker createDatabaseLocker() throws IOException { return new DefaultDatabaseLocker(getDataSource(), getStatements()); } + + public String toString(){ + return "JDBCPersistenceAdaptor("+super.toString()+")"; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 4c717527cc..0a5023ab79 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -669,5 +669,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); } + + public String toString(){ + return "JournalPersistenceAdapator(" + longTermPersistence + ")"; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java index 76bd9386da..b3ef1b3494 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java @@ -270,4 +270,8 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ String name=dir.getAbsolutePath()+File.separator+"kahadb"; return name; } + + public String toString(){ + return "KahaPersistenceAdapter(" + getStoreName() +")"; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 37798e5463..88165144c4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -50,12 +50,16 @@ public class MemoryMessageStore implements MessageStore{ } public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ - messageTable.put(message.getMessageId(),message); + synchronized(messageTable){ + messageTable.put(message.getMessageId(),message); + } } public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) throws IOException{ - messageTable.put(messageId,messageRef); + synchronized(messageTable){ + messageTable.put(messageId,messageRef); + } } public Message getMessage(MessageId identity) throws IOException{ @@ -67,11 +71,16 @@ public class MemoryMessageStore implements MessageStore{ } public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{ - messageTable.remove(ack.getLastMessageId()); + removeMessage(ack.getLastMessageId()); } public void removeMessage(MessageId msgId) throws IOException{ - messageTable.remove(msgId); + synchronized(messageTable){ + messageTable.remove(msgId); + if(lastBatchId!=null&lastBatchId.equals(msgId)){ + lastBatchId=null; + } + } } public void recover(MessageRecoveryListener listener) throws Exception{ @@ -96,7 +105,9 @@ public class MemoryMessageStore implements MessageStore{ } public void removeAllMessages(ConnectionContext context) throws IOException{ - messageTable.clear(); + synchronized(messageTable){ + messageTable.clear(); + } } public ActiveMQDestination getDestination(){ @@ -104,7 +115,9 @@ public class MemoryMessageStore implements MessageStore{ } public void delete(){ - messageTable.clear(); + synchronized(messageTable){ + messageTable.clear(); + } } /** @@ -117,18 +130,16 @@ public class MemoryMessageStore implements MessageStore{ return messageTable.size(); } - public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ synchronized(messageTable){ - boolean pastLackBatch=lastBatchId==null; - int count = 0; + int count=0; for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ Map.Entry entry=(Entry)iter.next(); if(pastLackBatch){ count++; Object msg=entry.getValue(); - lastBatchId = (MessageId)entry.getKey(); + lastBatchId=(MessageId)entry.getKey(); if(msg.getClass()==String.class){ listener.recoverMessageReference((String)msg); }else{ @@ -143,6 +154,6 @@ public class MemoryMessageStore implements MessageStore{ } public void resetBatching(){ - lastBatchId = null; + lastBatchId=null; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index 07f9f1d24a..3a46b2415b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -154,4 +154,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { */ public void setUsageManager(UsageManager usageManager) { } + + public String toString(){ + return "MemoryPersistenceAdapter"; + } }