From 7a5abebea8e33e8b585a81b69755a6206bf963e3 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 16 Jun 2010 07:40:33 +0000 Subject: [PATCH] make concurrent store and dispatch with transactions optional git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@955149 13f79535-47bb-0310-9956-ffa450edef68 --- .../kahadb/KahaDBPersistenceAdapter.java | 109 ++++++++++++------ .../activemq/store/kahadb/KahaDBStore.java | 15 +++ .../store/kahadb/KahaDBTransactionStore.java | 29 +++-- .../ft/TransactedTopicMasterSlaveTest.java | 18 +++ 4 files changed, 126 insertions(+), 45 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 3f7c711ea2..b1c02ae362 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -31,6 +31,7 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.usage.SystemUsage; + /** * An implementation of {@link PersistenceAdapter} designed for use with a * {@link Journal} and then check pointing asynchronously on a timeout with some @@ -41,7 +42,6 @@ import org.apache.activemq.usage.SystemUsage; */ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { private final KahaDBStore letter = new KahaDBStore(); - /** * @param context @@ -157,8 +157,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi this.letter.setBrokerName(brokerName); } - - /** * @param usageManager * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) @@ -193,6 +191,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the journalMaxFileLength + * * @return the journalMaxFileLength */ public int getJournalMaxFileLength() { @@ -200,8 +199,11 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } /** - * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" + * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can + * be used + * + * @org.apache.xbean.Property + * propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" */ public void setJournalMaxFileLength(int journalMaxFileLength) { this.letter.setJournalMaxFileLength(journalMaxFileLength); @@ -209,6 +211,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the checkpointInterval + * * @return the checkpointInterval */ public long getCheckpointInterval() { @@ -217,7 +220,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the checkpointInterval - * @param checkpointInterval the checkpointInterval to set + * + * @param checkpointInterval + * the checkpointInterval to set */ public void setCheckpointInterval(long checkpointInterval) { this.letter.setCheckpointInterval(checkpointInterval); @@ -225,6 +230,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the cleanupInterval + * * @return the cleanupInterval */ public long getCleanupInterval() { @@ -233,7 +239,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the cleanupInterval - * @param cleanupInterval the cleanupInterval to set + * + * @param cleanupInterval + * the cleanupInterval to set */ public void setCleanupInterval(long cleanupInterval) { this.letter.setCleanupInterval(cleanupInterval); @@ -241,6 +249,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the indexWriteBatchSize + * * @return the indexWriteBatchSize */ public int getIndexWriteBatchSize() { @@ -249,7 +258,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the indexWriteBatchSize - * @param indexWriteBatchSize the indexWriteBatchSize to set + * + * @param indexWriteBatchSize + * the indexWriteBatchSize to set */ public void setIndexWriteBatchSize(int indexWriteBatchSize) { this.letter.setIndexWriteBatchSize(indexWriteBatchSize); @@ -257,6 +268,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the journalMaxWriteBatchSize + * * @return the journalMaxWriteBatchSize */ public int getJournalMaxWriteBatchSize() { @@ -265,14 +277,17 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the journalMaxWriteBatchSize - * @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set + * + * @param journalMaxWriteBatchSize + * the journalMaxWriteBatchSize to set */ public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); } - + /** * Get the enableIndexWriteAsync + * * @return the enableIndexWriteAsync */ public boolean isEnableIndexWriteAsync() { @@ -281,7 +296,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the enableIndexWriteAsync - * @param enableIndexWriteAsync the enableIndexWriteAsync to set + * + * @param enableIndexWriteAsync + * the enableIndexWriteAsync to set */ public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); @@ -289,12 +306,13 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the directory + * * @return the directory */ public File getDirectory() { return this.letter.getDirectory(); } - + /** * @param dir * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) @@ -305,6 +323,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Get the enableJournalDiskSyncs + * * @return the enableJournalDiskSyncs */ public boolean isEnableJournalDiskSyncs() { @@ -313,14 +332,17 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the enableJournalDiskSyncs - * @param enableJournalDiskSyncs the enableJournalDiskSyncs to set + * + * @param enableJournalDiskSyncs + * the enableJournalDiskSyncs to set */ public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); } - + /** * Get the indexCacheSize + * * @return the indexCacheSize */ public int getIndexCacheSize() { @@ -329,14 +351,17 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the indexCacheSize - * @param indexCacheSize the indexCacheSize to set + * + * @param indexCacheSize + * the indexCacheSize to set */ public void setIndexCacheSize(int indexCacheSize) { this.letter.setIndexCacheSize(indexCacheSize); } - + /** * Get the ignoreMissingJournalfiles + * * @return the ignoreMissingJournalfiles */ public boolean isIgnoreMissingJournalfiles() { @@ -345,7 +370,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi /** * Set the ignoreMissingJournalfiles - * @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set + * + * @param ignoreMissingJournalfiles + * the ignoreMissingJournalfiles to set */ public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); @@ -367,9 +394,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); } - public void setBrokerService(BrokerService brokerService) { - letter.setBrokerService(brokerService); - } + public void setBrokerService(BrokerService brokerService) { + letter.setBrokerService(brokerService); + } public boolean isArchiveDataLogs() { return letter.isArchiveDataLogs(); @@ -378,7 +405,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi public void setArchiveDataLogs(boolean archiveDataLogs) { letter.setArchiveDataLogs(archiveDataLogs); } - + public File getDirectoryArchive() { return letter.getDirectoryArchive(); } @@ -386,36 +413,52 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi public void setDirectoryArchive(File directoryArchive) { letter.setDirectoryArchive(directoryArchive); } - + public boolean isConcurrentStoreAndDispatchQueues() { return letter.isConcurrentStoreAndDispatchQueues(); } - + public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); - } - + } + public boolean isConcurrentStoreAndDispatchTopics() { return letter.isConcurrentStoreAndDispatchTopics(); } - + public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); - } - + } + + /** + * @return the concurrentStoreAndDispatchTransactions + */ + public boolean isConcurrentStoreAndDispatchTransactions() { + return letter.isConcurrentStoreAndDispatchTransactions(); + } + + /** + * @param concurrentStoreAndDispatchTransactions + * the concurrentStoreAndDispatchTransactions to set + */ + public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) { + letter.setConcurrentStoreAndDispatchTransactions(concurrentStoreAndDispatchTransactions); + } + public int getMaxAsyncJobs() { return letter.getMaxAsyncJobs(); } /** - * @param maxAsyncJobs the maxAsyncJobs to set + * @param maxAsyncJobs + * the maxAsyncJobs to set */ public void setMaxAsyncJobs(int maxAsyncJobs) { - letter.setMaxAsyncJobs(maxAsyncJobs); - } - + letter.setMaxAsyncJobs(maxAsyncJobs); + } + @Override public String toString() { String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET"; - return "KahaDBPersistenceAdapter[" + path +"]" ; + return "KahaDBPersistenceAdapter[" + path + "]"; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 38a71531f7..3a8fe1bc34 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -96,6 +96,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ Semaphore globalTopicSemaphore; private boolean concurrentStoreAndDispatchQueues = true; private boolean concurrentStoreAndDispatchTopics = true; + private boolean concurrentStoreAndDispatchTransactions = true; private int maxAsyncJobs = MAX_ASYNC_JOBS; private final KahaDBTransactionStore transactionStore; @@ -143,6 +144,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{ public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; } + + /** + * @return the concurrentStoreAndDispatchTransactions + */ + public boolean isConcurrentStoreAndDispatchTransactions() { + return this.concurrentStoreAndDispatchTransactions; + } + + /** + * @param concurrentStoreAndDispatchTransactions the concurrentStoreAndDispatchTransactions to set + */ + public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) { + this.concurrentStoreAndDispatchTransactions = concurrentStoreAndDispatchTransactions; + } /** * @return the maxAsyncJobs diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java index 8b80109fad..df4b2343e2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java @@ -49,6 +49,8 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.wireformat.WireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * Provides a TransactionStore implementation that can create transaction aware @@ -57,7 +59,7 @@ import org.apache.activemq.wireformat.WireFormat; * @version $Revision: 1.4 $ */ public class KahaDBTransactionStore implements TransactionStore { - + static final Log LOG = LogFactory.getLog(KahaDBTransactionStore.class); ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); private final WireFormat wireFormat = new OpenWireFormat(); private final KahaDBStore theStore; @@ -222,7 +224,7 @@ public class KahaDBTransactionStore implements TransactionStore { public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) throws IOException { if (txid != null) { - if (!txid.isXATransaction()) { + if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) { if (preCommit != null) { preCommit.run(); } @@ -262,7 +264,8 @@ public class KahaDBTransactionStore implements TransactionStore { KahaTransactionInfo info = getTransactionInfo(txid); theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit); } - + }else { + LOG.error("Null transaction passed on commit"); } } @@ -271,11 +274,11 @@ public class KahaDBTransactionStore implements TransactionStore { * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) */ public void rollback(TransactionId txid) throws IOException { - if (txid.isXATransaction()) { + if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()) { KahaTransactionInfo info = getTransactionInfo(txid); theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null); } else { - Object result = inflightTransactions.remove(txid); + inflightTransactions.remove(txid); } } @@ -323,7 +326,7 @@ public class KahaDBTransactionStore implements TransactionStore { throws IOException { if (message.getTransactionId() != null) { - if (message.getTransactionId().isXATransaction()) { + if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { destination.addMessage(context, message); } else { Tx tx = getTx(message.getTransactionId()); @@ -349,8 +352,9 @@ public class KahaDBTransactionStore implements TransactionStore { throws IOException { if (message.getTransactionId() != null) { - if (message.getTransactionId().isXATransaction()) { - return destination.asyncAddQueueMessage(context, message); + if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { + destination.addMessage(context, message); + return AbstractMessageStore.FUTURE; } else { Tx tx = getTx(message.getTransactionId()); tx.add(new AddMessageCommand(context) { @@ -375,8 +379,9 @@ public class KahaDBTransactionStore implements TransactionStore { throws IOException { if (message.getTransactionId() != null) { - if (message.getTransactionId().isXATransaction()) { - return destination.asyncAddTopicMessage(context, message); + if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { + destination.addMessage(context, message); + return AbstractMessageStore.FUTURE; } else { Tx tx = getTx(message.getTransactionId()); tx.add(new AddMessageCommand(context) { @@ -405,7 +410,7 @@ public class KahaDBTransactionStore implements TransactionStore { throws IOException { if (ack.isInTransaction()) { - if (ack.getTransactionId().isXATransaction()) { + if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { destination.removeMessage(context, ack); } else { Tx tx = getTx(ack.getTransactionId()); @@ -431,7 +436,7 @@ public class KahaDBTransactionStore implements TransactionStore { throws IOException { if (ack.isInTransaction()) { - if (ack.getTransactionId().isXATransaction()) { + if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { destination.removeAsyncMessage(context, ack); } else { Tx tx = getTx(ack.getTransactionId()); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java index 802cfbded1..9494b31ad1 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java @@ -16,10 +16,12 @@ */ package org.apache.activemq.broker.ft; +import java.io.File; import java.net.URISyntaxException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTopicTransactionTest; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.test.JmsResourceProvider; /** @@ -32,13 +34,19 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest { protected String uriString = "failover://(tcp://localhost:62001?soWriteTimeout=15000,tcp://localhost:62002?soWriteTimeout=15000)?randomize=false"; private boolean stopMaster = false; + @Override protected void setUp() throws Exception { failureCount = super.batchCount / 2; // this will create the main (or master broker) broker = createBroker(); + File dir = new File ("target" + File.separator + "slave"); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dir); + adapter.setConcurrentStoreAndDispatchTransactions(false); broker.start(); slave = new BrokerService(); slave.setBrokerName("slave"); + slave.setPersistenceAdapter(adapter); slave.setDeleteAllMessagesOnStartup(true); slave.setMasterConnectorURI("tcp://localhost:62001"); slave.addConnector("tcp://localhost:62002"); @@ -53,26 +61,35 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest { reconnect(); } + @Override protected void tearDown() throws Exception { slave.stop(); slave = null; super.tearDown(); } + @Override protected BrokerService createBroker() throws Exception, URISyntaxException { + File dir = new File ("target" + File.separator + "master"); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dir); + adapter.setConcurrentStoreAndDispatchTransactions(false); BrokerService broker = new BrokerService(); broker.setBrokerName("master"); + broker.setPersistenceAdapter(adapter); broker.setDeleteAllMessagesOnStartup(true); broker.addConnector("tcp://localhost:62001"); return broker; } + @Override protected JmsResourceProvider getJmsResourceProvider() { JmsResourceProvider p = super.getJmsResourceProvider(); p.setServerUri(uriString); return p; } + @Override protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { return new ActiveMQConnectionFactory(uriString); } @@ -86,6 +103,7 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest { } } + @Override protected void messageSent() throws Exception { if (stopMaster) { if (++inflightMessageCount >= failureCount) {