diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java index cbdca4aa97..160a3fface 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java @@ -41,5 +41,11 @@ public interface ReferenceStoreAdapter extends PersistenceAdapter { public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException; public Set getReferenceFileIdsInUse() throws IOException; + + /** + * If the store isn't valid, it can be recoverd at start-up + * @return true if the reference store is in a consistent state + */ + public boolean isStoreValid(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index 2829d4e0aa..8176090202 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -157,7 +157,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter,UsageListener{ } },"ActiveMQ Journal Checkpoint Worker"); createTransactionStore(); - recover(); + if(referenceStoreAdapter.isStoreValid()==false){ + log.warn("The ReferenceStore is not valid - recovering ..."); + recover(); + log.info("Finished recovering the ReferenceStore"); + }else { + Location location=writeTraceMessage("RECOVERED "+new Date(),true); + asyncDataManager.setMark(location,true); + } // Do a checkpoint periodically. periodicCheckpointTask=new Runnable(){ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java index 5a2b4463d2..1c8cf0b555 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java @@ -58,18 +58,21 @@ public class TopicSubContainer { return listContainer.placeLast(ref); } - public ConsumerMessageRef remove(MessageId id) { - ConsumerMessageRef result = null; - if (!listContainer.isEmpty()) { - for (StoreEntry entry = listContainer.getFirst(); entry != null; entry = listContainer.getNext(entry)) { - ConsumerMessageRef ref = (ConsumerMessageRef) listContainer.get(entry); - if (ref != null && ref.getMessageId().equals(id)) { - listContainer.remove(entry); - result = ref; - if (listContainer != null && batchEntry != null && (listContainer.isEmpty() || batchEntry.equals(entry))) { + public ConsumerMessageRef remove(MessageId id){ + ConsumerMessageRef result=null; + if(!listContainer.isEmpty()){ + StoreEntry entry=listContainer.getFirst(); + while(entry!=null){ + ConsumerMessageRef ref=(ConsumerMessageRef)listContainer.get(entry); + listContainer.remove(entry); + if(ref!=null&&ref.getMessageId().equals(id)){ + result=ref; + if(listContainer!=null&&batchEntry!=null&&(listContainer.isEmpty()||batchEntry.equals(entry))){ reset(); } + break; } + entry=listContainer.getFirst(); } } return result;