mirror of https://github.com/apache/activemq.git
added isStoreValid() method - used by AMQStore to determine if references need to be rebuilt
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@516408 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
82d19974d8
commit
dda3e2a429
|
@ -42,4 +42,10 @@ public interface ReferenceStoreAdapter extends PersistenceAdapter {
|
||||||
|
|
||||||
public Set<Integer> getReferenceFileIdsInUse() throws IOException;
|
public Set<Integer> getReferenceFileIdsInUse() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the store isn't valid, it can be recoverd at start-up
|
||||||
|
* @return true if the reference store is in a consistent state
|
||||||
|
*/
|
||||||
|
public boolean isStoreValid();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,7 +157,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter,UsageListener{
|
||||||
}
|
}
|
||||||
},"ActiveMQ Journal Checkpoint Worker");
|
},"ActiveMQ Journal Checkpoint Worker");
|
||||||
createTransactionStore();
|
createTransactionStore();
|
||||||
|
if(referenceStoreAdapter.isStoreValid()==false){
|
||||||
|
log.warn("The ReferenceStore is not valid - recovering ...");
|
||||||
recover();
|
recover();
|
||||||
|
log.info("Finished recovering the ReferenceStore");
|
||||||
|
}else {
|
||||||
|
Location location=writeTraceMessage("RECOVERED "+new Date(),true);
|
||||||
|
asyncDataManager.setMark(location,true);
|
||||||
|
}
|
||||||
// Do a checkpoint periodically.
|
// Do a checkpoint periodically.
|
||||||
periodicCheckpointTask=new Runnable(){
|
periodicCheckpointTask=new Runnable(){
|
||||||
|
|
||||||
|
|
|
@ -58,18 +58,21 @@ public class TopicSubContainer {
|
||||||
return listContainer.placeLast(ref);
|
return listContainer.placeLast(ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConsumerMessageRef remove(MessageId id) {
|
public ConsumerMessageRef remove(MessageId id){
|
||||||
ConsumerMessageRef result = null;
|
ConsumerMessageRef result=null;
|
||||||
if (!listContainer.isEmpty()) {
|
if(!listContainer.isEmpty()){
|
||||||
for (StoreEntry entry = listContainer.getFirst(); entry != null; entry = listContainer.getNext(entry)) {
|
StoreEntry entry=listContainer.getFirst();
|
||||||
ConsumerMessageRef ref = (ConsumerMessageRef) listContainer.get(entry);
|
while(entry!=null){
|
||||||
if (ref != null && ref.getMessageId().equals(id)) {
|
ConsumerMessageRef ref=(ConsumerMessageRef)listContainer.get(entry);
|
||||||
listContainer.remove(entry);
|
listContainer.remove(entry);
|
||||||
result = ref;
|
if(ref!=null&&ref.getMessageId().equals(id)){
|
||||||
if (listContainer != null && batchEntry != null && (listContainer.isEmpty() || batchEntry.equals(entry))) {
|
result=ref;
|
||||||
|
if(listContainer!=null&&batchEntry!=null&&(listContainer.isEmpty()||batchEntry.equals(entry))){
|
||||||
reset();
|
reset();
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
entry=listContainer.getFirst();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
|
|
Loading…
Reference in New Issue