This commit is contained in:
Dejan Bosanac 2015-04-14 14:55:11 +02:00
parent 2852a8bdb1
commit f556076a2c
1 changed files with 5 additions and 32 deletions

View File

@ -64,7 +64,6 @@ public class TransactionBroker extends BrokerFilter {
// The prepared XA transactions. // The prepared XA transactions.
private TransactionStore transactionStore; private TransactionStore transactionStore;
private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>(); private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
private ActiveMQMessageAudit audit;
public TransactionBroker(Broker next, TransactionStore transactionStore) { public TransactionBroker(Broker next, TransactionStore transactionStore) {
super(next); super(next);
@ -286,33 +285,14 @@ public class TransactionBroker extends BrokerFilter {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
Transaction originalTx = context.getTransaction(); Transaction originalTx = context.getTransaction();
Transaction transaction = null; Transaction transaction = null;
Synchronization sync = null;
if (message.getTransactionId() != null) { if (message.getTransactionId() != null) {
transaction = getTransaction(context, message.getTransactionId(), false); transaction = getTransaction(context, message.getTransactionId(), false);
if (transaction != null) {
sync = new Synchronization() {
public void afterRollback() {
if (audit != null) {
audit.rollback(message);
}
}
};
transaction.addSynchronization(sync);
}
} }
if (audit == null || !audit.isDuplicate(message)) { context.setTransaction(transaction);
context.setTransaction(transaction); try {
try { next.send(producerExchange, message);
next.send(producerExchange, message); } finally {
} finally { context.setTransaction(originalTx);
context.setTransaction(originalTx);
}
} else {
if (sync != null && transaction != null) {
transaction.removeSynchronization(sync);
}
LOG.debug("IGNORING duplicate message {}", message);
} }
} }
@ -378,11 +358,4 @@ public class TransactionBroker extends BrokerFilter {
} }
} }
public synchronized void brokerServiceStarted() {
super.brokerServiceStarted();
if (audit == null) {
audit = new ActiveMQMessageAudit();
}
}
} }