persist in-progress XA transactions - in order to speed up recovery

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@557391 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-07-18 20:34:19 +00:00
parent 2438ada6ad
commit 993f78caac
2 changed files with 120 additions and 116 deletions

View File

@ -11,7 +11,6 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.File;
@ -49,8 +48,6 @@ import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.amq.AMQTransactionStore.Tx;
import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Scheduler;
@ -180,11 +177,8 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
// The following was attempting to reduce startup times by avoiding the log
// file scanning that recovery performs. The problem with it is that XA transactions
// only live in transaction log and are not stored in the reference store, but they still
// need to be recovered when the broker starts up. Perhaps on a graceful shutdown we
// should record all the in flight XA transactions to a file to avoid having to scan
// the entire transaction log. For now going to comment this bit out.
//
/*
// need to be recovered when the broker starts up.
if(referenceStoreAdapter.isStoreValid()==false){
log.warn("The ReferenceStore is not valid - recovering ...");
recover();
@ -192,10 +186,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}else {
Location location=writeTraceMessage("RECOVERED "+new Date(),true);
asyncDataManager.setMark(location,true);
//recover transactions
getTransactionStore().setPreparedTransactions(referenceStoreAdapter.retrievePreparedState());
}
*/
recover();
// Do a checkpoint periodically.
periodicCheckpointTask=new Runnable(){
@ -237,6 +232,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
synchronized(this){
checkpointTask.shutdown();
}
referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
queues.clear();
topics.clear();
IOException firstException=null;
@ -355,13 +351,15 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return destinations;
}
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
if(destination.isQueue()){
return createQueueMessageStore((ActiveMQQueue)destination);
}else{
return createTopicMessageStore((ActiveMQTopic)destination);
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
AMQMessageStore store=queues.get(destination);
@ -494,28 +492,16 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
AMQTx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
if(tx==null)
break; // We may be trying to replay a commit that
// was already committed.
// Replay the committed operations.
tx.getOperations();
for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
TxOperation op=(TxOperation)iter.next();
if(op.operationType==TxOperation.ADD_OPERATION_TYPE){
if(op.store.replayAddMessage(context,(Message)op.data,op.location))
redoCounter++;
}
if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){
if(op.store.replayRemoveMessage(context,(MessageAck)op.data))
redoCounter++;
}
if(op.operationType==TxOperation.ACK_OPERATION_TYPE){
JournalTopicAck ack=(JournalTopicAck)op.data;
if(((AMQTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack
.getSubscritionName(),ack.getMessageId())){
redoCounter++;
}
AMQTxOperation op=(AMQTxOperation)iter.next();
if (op.replay(this,context)) {
redoCounter++;
}
}
break;

View File

@ -20,8 +20,10 @@ package org.apache.activemq.store.kahadaptor;
import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.ActiveMQDestination;
@ -29,6 +31,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
@ -41,30 +44,33 @@ import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.amq.AMQTx;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.groovy.antlr.treewalker.PreOrderTraversal;
public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
private static final String STORE_STATE = "store-state";
private static final String RECORD_REFERENCES = "record-references";
public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter{
private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
private static final String STORE_STATE="store-state";
private static final String RECORD_REFERENCES="record-references";
private static final String TRANSACTIONS="transactions-state";
private MapContainer stateMap;
private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
private MapContainer preparedTransactions;
private Map<Integer,AtomicInteger> recordReferences=new HashMap<Integer,AtomicInteger>();
private ListContainer durableSubscribers;
private boolean storeValid;
private Store stateStore;
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
throw new RuntimeException("Use createQueueReferenceStore instead");
throw new RuntimeException("Use createQueueReferenceStore instead");
}
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
throw new RuntimeException("Use createTopicReferenceStore instead");
throw new RuntimeException("Use createTopicReferenceStore instead");
}
@Override
public synchronized void start() throws Exception{
@Override public synchronized void start() throws Exception{
super.start();
Store store=getStateStore();
boolean empty=store.getMapContainerIds().isEmpty();
@ -82,43 +88,44 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
}
}
stateMap.put(STORE_STATE,new AtomicBoolean());
durableSubscribers = store.getListContainer("durableSubscribers");
durableSubscribers=store.getListContainer("durableSubscribers");
durableSubscribers.setMarshaller(new CommandMarshaller());
preparedTransactions=store.getMapContainer("transactions",TRANSACTIONS,false);
//need to set the Marshallers here
preparedTransactions.setKeyMarshaller(Store.CommandMarshaller);
preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat));
}
@Override
public synchronized void stop() throws Exception {
@Override public synchronized void stop() throws Exception{
stateMap.put(RECORD_REFERENCES,recordReferences);
stateMap.put(STORE_STATE,new AtomicBoolean(true));
if (this.stateStore != null) {
if(this.stateStore!=null){
this.stateStore.close();
this.stateStore = null;
this.stateMap = null;
this.stateStore=null;
this.stateMap=null;
}
super.stop();
super.stop();
}
public boolean isStoreValid() {
public boolean isStoreValid(){
return storeValid;
}
public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
ReferenceStore rc=(ReferenceStore)queues.get(destination);
public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException{
ReferenceStore rc=(ReferenceStore)queues.get(destination);
if(rc==null){
rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
// }
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
// }
queues.put(destination,rc);
}
return rc;
}
}
public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException{
TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
if(rc==null){
Store store=getStore();
MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
@ -127,54 +134,52 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
// }
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
// }
topics.put(destination,rc);
}
return rc;
}
}
public void buildReferenceFileIdsInUse() throws IOException {
recordReferences = new HashMap<Integer,AtomicInteger>();
Set<ActiveMQDestination> destinations = getDestinations();
for (ActiveMQDestination destination : destinations) {
if( destination.isQueue() ) {
KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
store.addReferenceFileIdsInUse();
} else {
KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
store.addReferenceFileIdsInUse();
}
}
}
protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
public void buildReferenceFileIdsInUse() throws IOException{
recordReferences=new HashMap<Integer,AtomicInteger>();
Set<ActiveMQDestination> destinations=getDestinations();
for(ActiveMQDestination destination:destinations){
if(destination.isQueue()){
KahaReferenceStore store=(KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination);
store.addReferenceFileIdsInUse();
}else{
KahaTopicReferenceStore store=(KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination);
store.addReferenceFileIdsInUse();
}
}
}
protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName)
throws IOException{
Store store=getStore();
MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);
MapContainer<MessageId,ReferenceRecord> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new MessageIdMarshaller());
container.setValueMarshaller(new ReferenceRecordMarshaller());
container.setValueMarshaller(new ReferenceRecordMarshaller());
container.load();
return container;
}
synchronized void addInterestInRecordFile(int recordNumber) {
Integer key = Integer.valueOf(recordNumber);
AtomicInteger rr = recordReferences.get(key);
if (rr == null) {
rr = new AtomicInteger();
synchronized void addInterestInRecordFile(int recordNumber){
Integer key=Integer.valueOf(recordNumber);
AtomicInteger rr=recordReferences.get(key);
if(rr==null){
rr=new AtomicInteger();
recordReferences.put(key,rr);
}
rr.incrementAndGet();
}
synchronized void removeInterestInRecordFile(int recordNumber) {
Integer key = Integer.valueOf(recordNumber);
AtomicInteger rr = recordReferences.get(key);
if (rr != null && rr.decrementAndGet() <= 0) {
synchronized void removeInterestInRecordFile(int recordNumber){
Integer key=Integer.valueOf(recordNumber);
AtomicInteger rr=recordReferences.get(key);
if(rr!=null&&rr.decrementAndGet()<=0){
recordReferences.remove(key);
}
}
@ -196,28 +201,45 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
public void clearMessages() throws IOException{
deleteAllMessages();
}
/**
*
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
*/
public void recoverState() throws IOException{
for (Iterator i = durableSubscribers.iterator();i.hasNext();) {
SubscriptionInfo info = (SubscriptionInfo)i.next();
TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
for(Iterator i=durableSubscribers.iterator();i.hasNext();){
SubscriptionInfo info=(SubscriptionInfo)i.next();
TopicReferenceStore ts=createTopicReferenceStore((ActiveMQTopic)info.getDestination());
ts.addSubsciption(info.getClientId(),info.getSubcriptionName(),info.getSelector(),false);
}
}
@Override
public synchronized void setDirectory(File directory){
File file = new File(directory,"data");
public Map<TransactionId,AMQTx> retrievePreparedState() throws IOException{
Map<TransactionId,AMQTx> result=new HashMap<TransactionId,AMQTx>();
preparedTransactions.load();
for(Iterator i=preparedTransactions.keySet().iterator();i.hasNext();){
TransactionId key=(TransactionId)i.next();
AMQTx value=(AMQTx)preparedTransactions.get(key);
result.put(key,value);
}
return result;
}
public void savePreparedState(Map<TransactionId,AMQTx> map) throws IOException{
preparedTransactions.clear();
for(Iterator<Map.Entry<TransactionId,AMQTx>> iter=map.entrySet().iterator();iter.hasNext();){
Map.Entry<TransactionId,AMQTx> entry=iter.next();
preparedTransactions.put(entry.getKey(),entry.getValue());
}
}
@Override public synchronized void setDirectory(File directory){
File file=new File(directory,"data");
super.setDirectory(file);
this.stateStore=createStateStore(directory);
}
protected synchronized Store getStateStore() throws IOException{
if(this.stateStore==null){
File stateDirectory=new File(getDirectory(),"kr-state");
@ -226,8 +248,8 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
}
return this.stateStore;
}
private Store createStateStore(File directory) {
private Store createStateStore(File directory){
File stateDirectory=new File(directory,"state");
stateDirectory.mkdirs();
try{
@ -236,19 +258,15 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
log.error("Failed to create the state store",e);
}
return null;
}
protected void addSubscriberState(SubscriptionInfo info) throws IOException {
durableSubscribers.add(info);
}
protected void removeSubscriberState(SubscriptionInfo info) {
durableSubscribers.remove(info);
}
protected void addSubscriberState(SubscriptionInfo info) throws IOException{
durableSubscribers.add(info);
}
protected void removeSubscriberState(SubscriptionInfo info){
durableSubscribers.remove(info);
}
}