diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 0d7feba6c2..261a04b5ea 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -56,7 +56,6 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; -import org.apache.activemq.command.TransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.AbstractMessageStore; @@ -106,8 +105,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, protected ExecutorService queueExecutor; protected ExecutorService topicExecutor; - protected final List> asyncQueueMaps = new LinkedList>(); - protected final List> asyncTopicMaps = new LinkedList>(); + protected final List> asyncQueueMaps = new LinkedList<>(); + protected final List> asyncTopicMaps = new LinkedList<>(); final WireFormat wireFormat = new OpenWireFormat(); private SystemUsage usageManager; private LinkedBlockingQueue asyncQueueJobQueue; @@ -118,19 +117,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, // when true, message order may be compromised when cache is exhausted if store is out // or order w.r.t cache private boolean concurrentStoreAndDispatchTopics = false; - private final boolean concurrentStoreAndDispatchTransactions = false; private int maxAsyncJobs = MAX_ASYNC_JOBS; private final KahaDBTransactionStore transactionStore; private TransactionIdTransformer transactionIdTransformer; public KahaDBStore() { this.transactionStore = new KahaDBTransactionStore(this); - this.transactionIdTransformer = new TransactionIdTransformer() { - @Override - public TransactionId transform(TransactionId txid) { - return txid; - } - }; + this.transactionIdTransformer = txid -> txid; } @Override @@ -181,10 +174,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; } - public boolean isConcurrentStoreAndDispatchTransactions() { - return this.concurrentStoreAndDispatchTransactions; - } - /** * @return the maxAsyncJobs */ diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java index b0f5c411d2..021a986e9a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java @@ -20,13 +20,8 @@ import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.activemq.broker.ConnectionContext; @@ -62,7 +57,6 @@ import org.slf4j.LoggerFactory; */ public class KahaDBTransactionStore implements TransactionStore { static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); - ConcurrentMap inflightTransactions = new ConcurrentHashMap(); private final KahaDBStore theStore; public KahaDBTransactionStore(KahaDBStore theStore) { @@ -74,9 +68,9 @@ public class KahaDBTransactionStore implements TransactionStore { } public class Tx { - private final List messages = Collections.synchronizedList(new ArrayList()); + private final List messages = Collections.synchronizedList(new ArrayList<>()); - private final List acks = Collections.synchronizedList(new ArrayList()); + private final List acks = Collections.synchronizedList(new ArrayList<>()); public void add(AddMessageCommand msg) { messages.add(msg); @@ -89,8 +83,7 @@ public class KahaDBTransactionStore implements TransactionStore { public Message[] getMessages() { Message rc[] = new Message[messages.size()]; int count = 0; - for (Iterator iter = messages.iterator(); iter.hasNext();) { - AddMessageCommand cmd = iter.next(); + for (AddMessageCommand cmd : messages) { rc[count++] = cmd.getMessage(); } return rc; @@ -99,8 +92,7 @@ public class KahaDBTransactionStore implements TransactionStore { public MessageAck[] getAcks() { MessageAck rc[] = new MessageAck[acks.size()]; int count = 0; - for (Iterator iter = acks.iterator(); iter.hasNext();) { - RemoveMessageCommand cmd = iter.next(); + for (RemoveMessageCommand cmd : acks) { rc[count++] = cmd.getMessageAck(); } return rc; @@ -111,16 +103,14 @@ public class KahaDBTransactionStore implements TransactionStore { * @throws IOException */ public List> commit() throws IOException { - List> results = new ArrayList>(); + List> results = new ArrayList<>(); // Do all the message adds. - for (Iterator iter = messages.iterator(); iter.hasNext();) { - AddMessageCommand cmd = iter.next(); + for (AddMessageCommand cmd : messages) { results.add(cmd.run()); } // And removes.. - for (Iterator iter = acks.iterator(); iter.hasNext();) { - RemoveMessageCommand cmd = iter.next(); + for (RemoveMessageCommand cmd : acks) { cmd.run(); results.add(cmd.run()); } @@ -129,7 +119,7 @@ public class KahaDBTransactionStore implements TransactionStore { } } - public abstract class AddMessageCommand { + public abstract static class AddMessageCommand { private final ConnectionContext ctx; AddMessageCommand(ConnectionContext ctx) { this.ctx = ctx; @@ -141,7 +131,7 @@ public class KahaDBTransactionStore implements TransactionStore { abstract Future run(ConnectionContext ctx) throws IOException; } - public abstract class RemoveMessageCommand { + public abstract static class RemoveMessageCommand { private final ConnectionContext ctx; RemoveMessageCommand(ConnectionContext ctx) { @@ -237,78 +227,20 @@ public class KahaDBTransactionStore implements TransactionStore { @Override public void prepare(TransactionId txid) throws IOException { KahaTransactionInfo info = getTransactionInfo(txid); - if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { - theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); - } else { - Tx tx = inflightTransactions.remove(txid); - if (tx != null) { - theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); - } - } - } - - public Tx getTx(Object txid) { - Tx tx = inflightTransactions.get(txid); - if (tx == null) { - synchronized (inflightTransactions) { - tx = inflightTransactions.get(txid); - if (tx == null) { - tx = new Tx(); - inflightTransactions.put(txid, tx); - } - } - } - return tx; + theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); } @Override public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit) throws IOException { if (txid != null) { - if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) { - if (preCommit != null) { - preCommit.run(); - } - Tx tx = inflightTransactions.remove(txid); - if (tx != null) { - List> results = tx.commit(); - boolean doneSomething = false; - for (Future result : results) { - try { - result.get(); - } catch (InterruptedException e) { - theStore.brokerService.handleIOException(new IOException(e.getMessage())); - } catch (ExecutionException e) { - theStore.brokerService.handleIOException(new IOException(e.getMessage())); - }catch(CancellationException e) { - } - if (!result.isCancelled()) { - doneSomething = true; - } - } - if (postCommit != null) { - postCommit.run(); - } - if (doneSomething) { - KahaTransactionInfo info = getTransactionInfo(txid); - theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); - } - }else { - //The Tx will be null for failed over clients - lets run their post commits - if (postCommit != null) { - postCommit.run(); - } - } - - } else { - KahaTransactionInfo info = getTransactionInfo(txid); - if (preCommit != null) { - preCommit.run(); - } - theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit); - forgetRecoveredAcks(txid, false); + KahaTransactionInfo info = getTransactionInfo(txid); + if (preCommit != null) { + preCommit.run(); } - }else { + theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit); + forgetRecoveredAcks(txid, false); + } else { LOG.error("Null transaction passed on commit"); } } @@ -319,13 +251,9 @@ public class KahaDBTransactionStore implements TransactionStore { */ @Override public void rollback(TransactionId txid) throws IOException { - if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { - KahaTransactionInfo info = getTransactionInfo(txid); - theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); - forgetRecoveredAcks(txid, true); - } else { - inflightTransactions.remove(txid); - } + KahaTransactionInfo info = getTransactionInfo(txid); + theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); + forgetRecoveredAcks(txid, true); } protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException { @@ -347,8 +275,8 @@ public class KahaDBTransactionStore implements TransactionStore { public synchronized void recover(TransactionRecoveryListener listener) throws IOException { for (Map.Entry> entry : theStore.preparedTransactions.entrySet()) { XATransactionId xid = (XATransactionId) entry.getKey(); - ArrayList messageList = new ArrayList(); - ArrayList ackList = new ArrayList(); + ArrayList messageList = new ArrayList<>(); + ArrayList ackList = new ArrayList<>(); for (Operation op : entry.getValue()) { if (op.getClass() == MessageDatabase.AddOperation.class) { @@ -384,52 +312,14 @@ public class KahaDBTransactionStore implements TransactionStore { */ void addMessage(ConnectionContext context, final MessageStore destination, final Message message) throws IOException { - - if (message.getTransactionId() != null) { - if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { - destination.addMessage(context, message); - } else { - Tx tx = getTx(message.getTransactionId()); - tx.add(new AddMessageCommand(context) { - @Override - public Message getMessage() { - return message; - } - @Override - public Future run(ConnectionContext ctx) throws IOException { - destination.addMessage(ctx, message); - return AbstractMessageStore.FUTURE; - } - - }); - } - } else { - destination.addMessage(context, message); - } + destination.addMessage(context, message); } ListenableFuture asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message) throws IOException { - if (message.getTransactionId() != null) { - if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { - destination.addMessage(context, message); - return AbstractMessageStore.FUTURE; - } else { - Tx tx = getTx(message.getTransactionId()); - tx.add(new AddMessageCommand(context) { - @Override - public Message getMessage() { - return message; - } - @Override - public Future run(ConnectionContext ctx) throws IOException { - return destination.asyncAddQueueMessage(ctx, message); - } - - }); - return AbstractMessageStore.FUTURE; - } + destination.addMessage(context, message); + return AbstractMessageStore.FUTURE; } else { return destination.asyncAddQueueMessage(context, message); } @@ -439,24 +329,8 @@ public class KahaDBTransactionStore implements TransactionStore { throws IOException { if (message.getTransactionId() != null) { - if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { - destination.addMessage(context, message); - return AbstractMessageStore.FUTURE; - } else { - Tx tx = getTx(message.getTransactionId()); - tx.add(new AddMessageCommand(context) { - @Override - public Message getMessage() { - return message; - } - @Override - public Future run(ConnectionContext ctx) throws IOException { - return destination.asyncAddTopicMessage(ctx, message); - } - - }); - return AbstractMessageStore.FUTURE; - } + destination.addMessage(context, message); + return AbstractMessageStore.FUTURE; } else { return destination.asyncAddTopicMessage(context, message); } @@ -468,80 +342,17 @@ public class KahaDBTransactionStore implements TransactionStore { */ final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) throws IOException { - - if (ack.isInTransaction()) { - if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { - destination.removeMessage(context, ack); - } else { - Tx tx = getTx(ack.getTransactionId()); - tx.add(new RemoveMessageCommand(context) { - @Override - public MessageAck getMessageAck() { - return ack; - } - - @Override - public Future run(ConnectionContext ctx) throws IOException { - destination.removeMessage(ctx, ack); - return AbstractMessageStore.FUTURE; - } - }); - } - } else { - destination.removeMessage(context, ack); - } + destination.removeMessage(context, ack); } final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) throws IOException { - - if (ack.isInTransaction()) { - if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { - destination.removeAsyncMessage(context, ack); - } else { - Tx tx = getTx(ack.getTransactionId()); - tx.add(new RemoveMessageCommand(context) { - @Override - public MessageAck getMessageAck() { - return ack; - } - - @Override - public Future run(ConnectionContext ctx) throws IOException { - destination.removeMessage(ctx, ack); - return AbstractMessageStore.FUTURE; - } - }); - } - } else { - destination.removeAsyncMessage(context, ack); - } + destination.removeAsyncMessage(context, ack); } final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName, final MessageId messageId, final MessageAck ack) throws IOException { - - if (ack.isInTransaction()) { - if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { - destination.acknowledge(context, clientId, subscriptionName, messageId, ack); - } else { - Tx tx = getTx(ack.getTransactionId()); - tx.add(new RemoveMessageCommand(context) { - @Override - public MessageAck getMessageAck() { - return ack; - } - - @Override - public Future run(ConnectionContext ctx) throws IOException { - destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); - return AbstractMessageStore.FUTURE; - } - }); - } - } else { - destination.acknowledge(context, clientId, subscriptionName, messageId, ack); - } + destination.acknowledge(context, clientId, subscriptionName, messageId, ack); }