make concurrent store and dispatch with transactions optional

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@955149 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-06-16 07:40:33 +00:00
parent 27262c8463
commit 7a5abebea8
4 changed files with 126 additions and 45 deletions

View File

@ -31,6 +31,7 @@ import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.usage.SystemUsage;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
@ -41,7 +42,6 @@ import org.apache.activemq.usage.SystemUsage;
*/
public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private final KahaDBStore letter = new KahaDBStore();
/**
* @param context
@ -157,8 +157,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
this.letter.setBrokerName(brokerName);
}
/**
* @param usageManager
* @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
@ -193,6 +191,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Get the journalMaxFileLength
*
* @return the journalMaxFileLength
*/
public int getJournalMaxFileLength() {
@ -200,8 +199,11 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
}
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
* be used
*
* @org.apache.xbean.Property
* propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.letter.setJournalMaxFileLength(journalMaxFileLength);
@ -209,6 +211,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Get the checkpointInterval
*
* @return the checkpointInterval
*/
public long getCheckpointInterval() {
@ -217,7 +220,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Set the checkpointInterval
* @param checkpointInterval the checkpointInterval to set
*
* @param checkpointInterval
* the checkpointInterval to set
*/
public void setCheckpointInterval(long checkpointInterval) {
this.letter.setCheckpointInterval(checkpointInterval);
@ -225,6 +230,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Get the cleanupInterval
*
* @return the cleanupInterval
*/
public long getCleanupInterval() {
@ -233,7 +239,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Set the cleanupInterval
* @param cleanupInterval the cleanupInterval to set
*
* @param cleanupInterval
* the cleanupInterval to set
*/
public void setCleanupInterval(long cleanupInterval) {
this.letter.setCleanupInterval(cleanupInterval);
@ -241,6 +249,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Get the indexWriteBatchSize
*
* @return the indexWriteBatchSize
*/
public int getIndexWriteBatchSize() {
@ -249,7 +258,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Set the indexWriteBatchSize
* @param indexWriteBatchSize the indexWriteBatchSize to set
*
* @param indexWriteBatchSize
* the indexWriteBatchSize to set
*/
public void setIndexWriteBatchSize(int indexWriteBatchSize) {
this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
@ -257,6 +268,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Get the journalMaxWriteBatchSize
*
* @return the journalMaxWriteBatchSize
*/
public int getJournalMaxWriteBatchSize() {
@ -265,14 +277,17 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Set the journalMaxWriteBatchSize
* @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set
*
* @param journalMaxWriteBatchSize
* the journalMaxWriteBatchSize to set
*/
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
}
/**
* Get the enableIndexWriteAsync
*
* @return the enableIndexWriteAsync
*/
public boolean isEnableIndexWriteAsync() {
@ -281,7 +296,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Set the enableIndexWriteAsync
* @param enableIndexWriteAsync the enableIndexWriteAsync to set
*
* @param enableIndexWriteAsync
* the enableIndexWriteAsync to set
*/
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
@ -289,12 +306,13 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Get the directory
*
* @return the directory
*/
public File getDirectory() {
return this.letter.getDirectory();
}
/**
* @param dir
* @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
@ -305,6 +323,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Get the enableJournalDiskSyncs
*
* @return the enableJournalDiskSyncs
*/
public boolean isEnableJournalDiskSyncs() {
@ -313,14 +332,17 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Set the enableJournalDiskSyncs
* @param enableJournalDiskSyncs the enableJournalDiskSyncs to set
*
* @param enableJournalDiskSyncs
* the enableJournalDiskSyncs to set
*/
public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
}
/**
* Get the indexCacheSize
*
* @return the indexCacheSize
*/
public int getIndexCacheSize() {
@ -329,14 +351,17 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Set the indexCacheSize
* @param indexCacheSize the indexCacheSize to set
*
* @param indexCacheSize
* the indexCacheSize to set
*/
public void setIndexCacheSize(int indexCacheSize) {
this.letter.setIndexCacheSize(indexCacheSize);
}
/**
* Get the ignoreMissingJournalfiles
*
* @return the ignoreMissingJournalfiles
*/
public boolean isIgnoreMissingJournalfiles() {
@ -345,7 +370,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
/**
* Set the ignoreMissingJournalfiles
* @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set
*
* @param ignoreMissingJournalfiles
* the ignoreMissingJournalfiles to set
*/
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
@ -367,9 +394,9 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
}
public void setBrokerService(BrokerService brokerService) {
letter.setBrokerService(brokerService);
}
public void setBrokerService(BrokerService brokerService) {
letter.setBrokerService(brokerService);
}
public boolean isArchiveDataLogs() {
return letter.isArchiveDataLogs();
@ -378,7 +405,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
public void setArchiveDataLogs(boolean archiveDataLogs) {
letter.setArchiveDataLogs(archiveDataLogs);
}
public File getDirectoryArchive() {
return letter.getDirectoryArchive();
}
@ -386,36 +413,52 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
public void setDirectoryArchive(File directoryArchive) {
letter.setDirectoryArchive(directoryArchive);
}
public boolean isConcurrentStoreAndDispatchQueues() {
return letter.isConcurrentStoreAndDispatchQueues();
}
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
}
}
public boolean isConcurrentStoreAndDispatchTopics() {
return letter.isConcurrentStoreAndDispatchTopics();
}
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
}
}
/**
* @return the concurrentStoreAndDispatchTransactions
*/
public boolean isConcurrentStoreAndDispatchTransactions() {
return letter.isConcurrentStoreAndDispatchTransactions();
}
/**
* @param concurrentStoreAndDispatchTransactions
* the concurrentStoreAndDispatchTransactions to set
*/
public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
letter.setConcurrentStoreAndDispatchTransactions(concurrentStoreAndDispatchTransactions);
}
public int getMaxAsyncJobs() {
return letter.getMaxAsyncJobs();
}
/**
* @param maxAsyncJobs the maxAsyncJobs to set
* @param maxAsyncJobs
* the maxAsyncJobs to set
*/
public void setMaxAsyncJobs(int maxAsyncJobs) {
letter.setMaxAsyncJobs(maxAsyncJobs);
}
letter.setMaxAsyncJobs(maxAsyncJobs);
}
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
return "KahaDBPersistenceAdapter[" + path +"]" ;
return "KahaDBPersistenceAdapter[" + path + "]";
}
}

View File

@ -96,6 +96,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
Semaphore globalTopicSemaphore;
private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true;
private boolean concurrentStoreAndDispatchTransactions = true;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore;
@ -143,6 +144,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
}
/**
* @return the concurrentStoreAndDispatchTransactions
*/
public boolean isConcurrentStoreAndDispatchTransactions() {
return this.concurrentStoreAndDispatchTransactions;
}
/**
* @param concurrentStoreAndDispatchTransactions the concurrentStoreAndDispatchTransactions to set
*/
public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
this.concurrentStoreAndDispatchTransactions = concurrentStoreAndDispatchTransactions;
}
/**
* @return the maxAsyncJobs

View File

@ -49,6 +49,8 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Provides a TransactionStore implementation that can create transaction aware
@ -57,7 +59,7 @@ import org.apache.activemq.wireformat.WireFormat;
* @version $Revision: 1.4 $
*/
public class KahaDBTransactionStore implements TransactionStore {
static final Log LOG = LogFactory.getLog(KahaDBTransactionStore.class);
ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
private final WireFormat wireFormat = new OpenWireFormat();
private final KahaDBStore theStore;
@ -222,7 +224,7 @@ public class KahaDBTransactionStore implements TransactionStore {
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
throws IOException {
if (txid != null) {
if (!txid.isXATransaction()) {
if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
if (preCommit != null) {
preCommit.run();
}
@ -262,7 +264,8 @@ public class KahaDBTransactionStore implements TransactionStore {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
}
}else {
LOG.error("Null transaction passed on commit");
}
}
@ -271,11 +274,11 @@ public class KahaDBTransactionStore implements TransactionStore {
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException {
if (txid.isXATransaction()) {
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()) {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
} else {
Object result = inflightTransactions.remove(txid);
inflightTransactions.remove(txid);
}
}
@ -323,7 +326,7 @@ public class KahaDBTransactionStore implements TransactionStore {
throws IOException {
if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction()) {
if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
destination.addMessage(context, message);
} else {
Tx tx = getTx(message.getTransactionId());
@ -349,8 +352,9 @@ public class KahaDBTransactionStore implements TransactionStore {
throws IOException {
if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction()) {
return destination.asyncAddQueueMessage(context, message);
if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
destination.addMessage(context, message);
return AbstractMessageStore.FUTURE;
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@ -375,8 +379,9 @@ public class KahaDBTransactionStore implements TransactionStore {
throws IOException {
if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction()) {
return destination.asyncAddTopicMessage(context, message);
if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
destination.addMessage(context, message);
return AbstractMessageStore.FUTURE;
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@ -405,7 +410,7 @@ public class KahaDBTransactionStore implements TransactionStore {
throws IOException {
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction()) {
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
destination.removeMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
@ -431,7 +436,7 @@ public class KahaDBTransactionStore implements TransactionStore {
throws IOException {
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction()) {
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
destination.removeAsyncMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());

View File

@ -16,10 +16,12 @@
*/
package org.apache.activemq.broker.ft;
import java.io.File;
import java.net.URISyntaxException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicTransactionTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.test.JmsResourceProvider;
/**
@ -32,13 +34,19 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest {
protected String uriString = "failover://(tcp://localhost:62001?soWriteTimeout=15000,tcp://localhost:62002?soWriteTimeout=15000)?randomize=false";
private boolean stopMaster = false;
@Override
protected void setUp() throws Exception {
failureCount = super.batchCount / 2;
// this will create the main (or master broker)
broker = createBroker();
File dir = new File ("target" + File.separator + "slave");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(dir);
adapter.setConcurrentStoreAndDispatchTransactions(false);
broker.start();
slave = new BrokerService();
slave.setBrokerName("slave");
slave.setPersistenceAdapter(adapter);
slave.setDeleteAllMessagesOnStartup(true);
slave.setMasterConnectorURI("tcp://localhost:62001");
slave.addConnector("tcp://localhost:62002");
@ -53,26 +61,35 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest {
reconnect();
}
@Override
protected void tearDown() throws Exception {
slave.stop();
slave = null;
super.tearDown();
}
@Override
protected BrokerService createBroker() throws Exception, URISyntaxException {
File dir = new File ("target" + File.separator + "master");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(dir);
adapter.setConcurrentStoreAndDispatchTransactions(false);
BrokerService broker = new BrokerService();
broker.setBrokerName("master");
broker.setPersistenceAdapter(adapter);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://localhost:62001");
return broker;
}
@Override
protected JmsResourceProvider getJmsResourceProvider() {
JmsResourceProvider p = super.getJmsResourceProvider();
p.setServerUri(uriString);
return p;
}
@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(uriString);
}
@ -86,6 +103,7 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest {
}
}
@Override
protected void messageSent() throws Exception {
if (stopMaster) {
if (++inflightMessageCount >= failureCount) {