ensure state is recovered if data is corrupted and has to be regenerated from transaction logs

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@549887 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-06-22 17:22:19 +00:00
parent 0112df5605
commit 2681bb89ee
5 changed files with 116 additions and 38 deletions

View File

@ -47,5 +47,18 @@ public interface ReferenceStoreAdapter extends PersistenceAdapter {
* @return true if the reference store is in a consistent state
*/
public boolean isStoreValid();
/**
* called by recover to clear out message references
* @throws IOException
*/
public void clearMessages() throws IOException;
/**
* recover any state
* @throws IOException
*
*/
public void recoverState() throws IOException;
}

View File

@ -183,16 +183,17 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
// need to be recovered when the broker starts up. Perhaps on a graceful shutdown we
// should record all the in flight XA transactions to a file to avoid having to scan
// the entire transaction log. For now going to comment this bit out.
//
// if(referenceStoreAdapter.isStoreValid()==false){
// log.warn("The ReferenceStore is not valid - recovering ...");
// recover();
// log.info("Finished recovering the ReferenceStore");
// }else {
// Location location=writeTraceMessage("RECOVERED "+new Date(),true);
// asyncDataManager.setMark(location,true);
// }
//
/*
if(referenceStoreAdapter.isStoreValid()==false){
log.warn("The ReferenceStore is not valid - recovering ...");
recover();
log.info("Finished recovering the ReferenceStore");
}else {
Location location=writeTraceMessage("RECOVERED "+new Date(),true);
asyncDataManager.setMark(location,true);
}
*/
recover();
// Do a checkpoint periodically.
@ -431,6 +432,8 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException,IOException{
referenceStoreAdapter.clearMessages();
referenceStoreAdapter.recoverState();
Location pos=null;
int redoCounter=0;
log.info("Journal Recovery Started from: "+asyncDataManager);

View File

@ -275,8 +275,9 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
if(!initialized){
initialized=true;
if(this.directory==null){
this.directory=new File(IOHelper.getDefaultDataDirectory());
this.directory=new File(this.directory,brokerName+"-kahastore");
File file =new File(IOHelper.getDefaultDataDirectory());
file=new File(file,brokerName+"-kahastore");
setDirectory(file);
}
this.directory.mkdirs();
wireFormat.setCacheEnabled(false);

View File

@ -17,8 +17,9 @@
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -27,10 +28,14 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.MessageIdMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
@ -45,7 +50,9 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
private static final String RECORD_REFERENCES = "record-references";
private MapContainer stateMap;
private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
private ListContainer durableSubscribers;
private boolean storeValid;
private Store stateStore;
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
@ -57,50 +64,37 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
}
@Override
public void start() throws Exception{
public synchronized void start() throws Exception{
super.start();
Store store=getStore();
Store store=getStateStore();
boolean empty=store.getMapContainerIds().isEmpty();
stateMap=store.getMapContainer("state",STORE_STATE);
stateMap.load();
if(!empty){
AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
if(status!=null){
storeValid=status.get();
}
if(storeValid){
if(stateMap.containsKey(RECORD_REFERENCES)){
recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
}
}else {
/*
log.warn("Store Not shutdown cleanly - clearing out unsafe records ...");
Set<ContainerId> set = store.getListContainerIds();
for (ContainerId cid:set) {
if (!cid.getDataContainerName().equals(STORE_STATE)) {
store.deleteListContainer(cid);
}
}
set = store.getMapContainerIds();
for (ContainerId cid:set) {
if (!cid.getDataContainerName().equals(STORE_STATE)) {
store.deleteMapContainer(cid);
}
}
*/
buildReferenceFileIdsInUse();
}
}
stateMap.put(STORE_STATE,new AtomicBoolean());
durableSubscribers = store.getListContainer("durableSubscribers");
durableSubscribers.setMarshaller(new CommandMarshaller());
}
@Override
public void stop() throws Exception {
public synchronized void stop() throws Exception {
stateMap.put(RECORD_REFERENCES,recordReferences);
stateMap.put(STORE_STATE,new AtomicBoolean(true));
if (this.stateStore != null) {
this.stateStore.close();
this.stateStore = null;
this.stateMap = null;
}
super.stop();
}
@ -194,6 +188,68 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
return recordReferences.keySet();
}
/**
*
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
*/
public void clearMessages() throws IOException{
deleteAllMessages();
}
/**
*
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
*/
public void recoverState() throws IOException{
for (Iterator i = durableSubscribers.iterator();i.hasNext();) {
SubscriptionInfo info = (SubscriptionInfo)i.next();
TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
ts.addSubsciption(info.getClientId(),info.getSubcriptionName(),info.getSelector(),false);
}
}
@Override
public void setDirectory(File directory){
File file = new File(directory,"data");
super.setDirectory(file);
this.stateStore=createStateStore(directory);
}
protected synchronized Store getStateStore() throws IOException{
if(this.stateStore==null){
File stateDirectory=new File(getDirectory(),"kr-state");
stateDirectory.mkdirs();
this.stateStore=createStateStore(getDirectory());
}
return this.stateStore;
}
private Store createStateStore(File directory) {
File stateDirectory=new File(directory,"state");
stateDirectory.mkdirs();
try{
return StoreFactory.open(stateDirectory.getAbsolutePath(),"rw");
}catch(IOException e){
log.error("Failed to create the state store",e);
}
return null;
}
protected void addSubscriberState(SubscriptionInfo info) throws IOException {
durableSubscribers.add(info);
}
protected void removeSubscriberState(SubscriptionInfo info) {
durableSubscribers.remove(info);
}
}
}

View File

@ -153,6 +153,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
// to hang around
if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info);
adapter.addSubscriberState(info);
}
// add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
@ -170,6 +171,10 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
SubscriptionInfo info = lookupSubscription(clientId,subscriptionName);
if (info != null) {
adapter.removeSubscriberState(info);
}
String key=getSubscriptionKey(clientId,subscriptionName);
removeSubscriberMessageContainer(key);
}