revert http://fisheye6.atlassian.com/changelog/activemq/?cs=958009 - concurrent dispatch with transaction completion breaks when the cache is disabled, either through memory limit or through durable subs with existing messages in the store. cannot see the use case though as the transaction semantic is broken if a message can get dispatched but a commit can still fail. With dispatch ocurring postCommit like before, the option for kahaDB to do concurrent transaction dispatch is disabled

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@982240 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-08-04 12:56:42 +00:00
parent 6f7e3fc1d5
commit 0af6b063e7
6 changed files with 4 additions and 33 deletions

View File

@ -679,7 +679,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
@Override @Override
public void beforeCommit() throws Exception { public void afterCommit() throws Exception {
sendLock.lockInterruptibly(); sendLock.lockInterruptibly();
try { try {
// It could take while before we receive the commit // It could take while before we receive the commit

View File

@ -452,7 +452,7 @@ public class Topic extends BaseDestination implements Task {
if (context.isInTransaction()) { if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
@Override @Override
public void beforeCommit() throws Exception { public void afterCommit() throws Exception {
// It could take while before we receive the commit // It could take while before we receive the commit
// operration.. by that time the message could have // operration.. by that time the message could have
// expired.. // expired..

View File

@ -460,21 +460,6 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
} }
/**
* @return the concurrentStoreAndDispatchTransactions
*/
public boolean isConcurrentStoreAndDispatchTransactions() {
return letter.isConcurrentStoreAndDispatchTransactions();
}
/**
* @param concurrentStoreAndDispatchTransactions
* the concurrentStoreAndDispatchTransactions to set
*/
public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
letter.setConcurrentStoreAndDispatchTransactions(concurrentStoreAndDispatchTransactions);
}
public int getMaxAsyncJobs() { public int getMaxAsyncJobs() {
return letter.getMaxAsyncJobs(); return letter.getMaxAsyncJobs();
} }

View File

@ -106,7 +106,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
Semaphore globalTopicSemaphore; Semaphore globalTopicSemaphore;
private boolean concurrentStoreAndDispatchQueues = true; private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true; private boolean concurrentStoreAndDispatchTopics = true;
private boolean concurrentStoreAndDispatchTransactions = true; private boolean concurrentStoreAndDispatchTransactions = false;
private int maxAsyncJobs = MAX_ASYNC_JOBS; private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore; private final KahaDBTransactionStore transactionStore;
@ -155,21 +155,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
} }
/**
* @return the concurrentStoreAndDispatchTransactions
*/
public boolean isConcurrentStoreAndDispatchTransactions() { public boolean isConcurrentStoreAndDispatchTransactions() {
return this.concurrentStoreAndDispatchTransactions; return this.concurrentStoreAndDispatchTransactions;
} }
/**
* @param concurrentStoreAndDispatchTransactions
* the concurrentStoreAndDispatchTransactions to set
*/
public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
this.concurrentStoreAndDispatchTransactions = concurrentStoreAndDispatchTransactions;
}
/** /**
* @return the maxAsyncJobs * @return the maxAsyncJobs
*/ */

View File

@ -217,7 +217,6 @@ public class KahaDBTransactionStore implements TransactionStore {
} }
/** /**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/ */
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)

View File

@ -42,7 +42,6 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest {
File dir = new File ("target" + File.separator + "slave"); File dir = new File ("target" + File.separator + "slave");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(dir); adapter.setDirectory(dir);
adapter.setConcurrentStoreAndDispatchTransactions(false);
broker.start(); broker.start();
slave = new BrokerService(); slave = new BrokerService();
slave.setBrokerName("slave"); slave.setBrokerName("slave");
@ -73,7 +72,6 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest {
File dir = new File ("target" + File.separator + "master"); File dir = new File ("target" + File.separator + "master");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(dir); adapter.setDirectory(dir);
adapter.setConcurrentStoreAndDispatchTransactions(false);
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setBrokerName("master"); broker.setBrokerName("master");
broker.setPersistenceAdapter(adapter); broker.setPersistenceAdapter(adapter);