[NO JIRA] Removing unused concurrentStoreAndDispatchTransactions variable from KahaDBStore. Cleaning up related unused code.

This commit is contained in:
Dmytro Chebotarskyi 2022-08-30 15:14:45 -07:00
parent 8c74765618
commit 73f827f9a6
2 changed files with 32 additions and 232 deletions

View File

@ -56,7 +56,6 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.AbstractMessageStore;
@ -106,8 +105,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
protected ExecutorService queueExecutor; protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor; protected ExecutorService topicExecutor;
protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<>();
protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<>();
final WireFormat wireFormat = new OpenWireFormat(); final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager; private SystemUsage usageManager;
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
@ -118,19 +117,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
// when true, message order may be compromised when cache is exhausted if store is out // when true, message order may be compromised when cache is exhausted if store is out
// or order w.r.t cache // or order w.r.t cache
private boolean concurrentStoreAndDispatchTopics = false; private boolean concurrentStoreAndDispatchTopics = false;
private final boolean concurrentStoreAndDispatchTransactions = false;
private int maxAsyncJobs = MAX_ASYNC_JOBS; private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore; private final KahaDBTransactionStore transactionStore;
private TransactionIdTransformer transactionIdTransformer; private TransactionIdTransformer transactionIdTransformer;
public KahaDBStore() { public KahaDBStore() {
this.transactionStore = new KahaDBTransactionStore(this); this.transactionStore = new KahaDBTransactionStore(this);
this.transactionIdTransformer = new TransactionIdTransformer() { this.transactionIdTransformer = txid -> txid;
@Override
public TransactionId transform(TransactionId txid) {
return txid;
}
};
} }
@Override @Override
@ -181,10 +174,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
} }
public boolean isConcurrentStoreAndDispatchTransactions() {
return this.concurrentStoreAndDispatchTransactions;
}
/** /**
* @return the maxAsyncJobs * @return the maxAsyncJobs
*/ */

View File

@ -20,13 +20,8 @@ import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -62,7 +57,6 @@ import org.slf4j.LoggerFactory;
*/ */
public class KahaDBTransactionStore implements TransactionStore { public class KahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
private final KahaDBStore theStore; private final KahaDBStore theStore;
public KahaDBTransactionStore(KahaDBStore theStore) { public KahaDBTransactionStore(KahaDBStore theStore) {
@ -74,9 +68,9 @@ public class KahaDBTransactionStore implements TransactionStore {
} }
public class Tx { public class Tx {
private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>()); private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<>());
private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>()); private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<>());
public void add(AddMessageCommand msg) { public void add(AddMessageCommand msg) {
messages.add(msg); messages.add(msg);
@ -89,8 +83,7 @@ public class KahaDBTransactionStore implements TransactionStore {
public Message[] getMessages() { public Message[] getMessages() {
Message rc[] = new Message[messages.size()]; Message rc[] = new Message[messages.size()];
int count = 0; int count = 0;
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { for (AddMessageCommand cmd : messages) {
AddMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessage(); rc[count++] = cmd.getMessage();
} }
return rc; return rc;
@ -99,8 +92,7 @@ public class KahaDBTransactionStore implements TransactionStore {
public MessageAck[] getAcks() { public MessageAck[] getAcks() {
MessageAck rc[] = new MessageAck[acks.size()]; MessageAck rc[] = new MessageAck[acks.size()];
int count = 0; int count = 0;
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { for (RemoveMessageCommand cmd : acks) {
RemoveMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessageAck(); rc[count++] = cmd.getMessageAck();
} }
return rc; return rc;
@ -111,16 +103,14 @@ public class KahaDBTransactionStore implements TransactionStore {
* @throws IOException * @throws IOException
*/ */
public List<Future<Object>> commit() throws IOException { public List<Future<Object>> commit() throws IOException {
List<Future<Object>> results = new ArrayList<Future<Object>>(); List<Future<Object>> results = new ArrayList<>();
// Do all the message adds. // Do all the message adds.
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { for (AddMessageCommand cmd : messages) {
AddMessageCommand cmd = iter.next();
results.add(cmd.run()); results.add(cmd.run());
} }
// And removes.. // And removes..
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { for (RemoveMessageCommand cmd : acks) {
RemoveMessageCommand cmd = iter.next();
cmd.run(); cmd.run();
results.add(cmd.run()); results.add(cmd.run());
} }
@ -129,7 +119,7 @@ public class KahaDBTransactionStore implements TransactionStore {
} }
} }
public abstract class AddMessageCommand { public abstract static class AddMessageCommand {
private final ConnectionContext ctx; private final ConnectionContext ctx;
AddMessageCommand(ConnectionContext ctx) { AddMessageCommand(ConnectionContext ctx) {
this.ctx = ctx; this.ctx = ctx;
@ -141,7 +131,7 @@ public class KahaDBTransactionStore implements TransactionStore {
abstract Future<Object> run(ConnectionContext ctx) throws IOException; abstract Future<Object> run(ConnectionContext ctx) throws IOException;
} }
public abstract class RemoveMessageCommand { public abstract static class RemoveMessageCommand {
private final ConnectionContext ctx; private final ConnectionContext ctx;
RemoveMessageCommand(ConnectionContext ctx) { RemoveMessageCommand(ConnectionContext ctx) {
@ -237,78 +227,20 @@ public class KahaDBTransactionStore implements TransactionStore {
@Override @Override
public void prepare(TransactionId txid) throws IOException { public void prepare(TransactionId txid) throws IOException {
KahaTransactionInfo info = getTransactionInfo(txid); KahaTransactionInfo info = getTransactionInfo(txid);
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
} else {
Tx tx = inflightTransactions.remove(txid);
if (tx != null) {
theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
}
}
}
public Tx getTx(Object txid) {
Tx tx = inflightTransactions.get(txid);
if (tx == null) {
synchronized (inflightTransactions) {
tx = inflightTransactions.get(txid);
if (tx == null) {
tx = new Tx();
inflightTransactions.put(txid, tx);
}
}
}
return tx;
} }
@Override @Override
public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit) public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit)
throws IOException { throws IOException {
if (txid != null) { if (txid != null) {
if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) { KahaTransactionInfo info = getTransactionInfo(txid);
if (preCommit != null) { if (preCommit != null) {
preCommit.run(); preCommit.run();
}
Tx tx = inflightTransactions.remove(txid);
if (tx != null) {
List<Future<Object>> results = tx.commit();
boolean doneSomething = false;
for (Future<Object> result : results) {
try {
result.get();
} catch (InterruptedException e) {
theStore.brokerService.handleIOException(new IOException(e.getMessage()));
} catch (ExecutionException e) {
theStore.brokerService.handleIOException(new IOException(e.getMessage()));
}catch(CancellationException e) {
}
if (!result.isCancelled()) {
doneSomething = true;
}
}
if (postCommit != null) {
postCommit.run();
}
if (doneSomething) {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
}
}else {
//The Tx will be null for failed over clients - lets run their post commits
if (postCommit != null) {
postCommit.run();
}
}
} else {
KahaTransactionInfo info = getTransactionInfo(txid);
if (preCommit != null) {
preCommit.run();
}
theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
forgetRecoveredAcks(txid, false);
} }
}else { theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
forgetRecoveredAcks(txid, false);
} else {
LOG.error("Null transaction passed on commit"); LOG.error("Null transaction passed on commit");
} }
} }
@ -319,13 +251,9 @@ public class KahaDBTransactionStore implements TransactionStore {
*/ */
@Override @Override
public void rollback(TransactionId txid) throws IOException { public void rollback(TransactionId txid) throws IOException {
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { KahaTransactionInfo info = getTransactionInfo(txid);
KahaTransactionInfo info = getTransactionInfo(txid); theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); forgetRecoveredAcks(txid, true);
forgetRecoveredAcks(txid, true);
} else {
inflightTransactions.remove(txid);
}
} }
protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException { protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException {
@ -347,8 +275,8 @@ public class KahaDBTransactionStore implements TransactionStore {
public synchronized void recover(TransactionRecoveryListener listener) throws IOException { public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) { for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId) entry.getKey(); XATransactionId xid = (XATransactionId) entry.getKey();
ArrayList<Message> messageList = new ArrayList<Message>(); ArrayList<Message> messageList = new ArrayList<>();
ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); ArrayList<MessageAck> ackList = new ArrayList<>();
for (Operation op : entry.getValue()) { for (Operation op : entry.getValue()) {
if (op.getClass() == MessageDatabase.AddOperation.class) { if (op.getClass() == MessageDatabase.AddOperation.class) {
@ -384,52 +312,14 @@ public class KahaDBTransactionStore implements TransactionStore {
*/ */
void addMessage(ConnectionContext context, final MessageStore destination, final Message message) void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException { throws IOException {
destination.addMessage(context, message);
if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
destination.addMessage(context, message);
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@Override
public Message getMessage() {
return message;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.addMessage(ctx, message);
return AbstractMessageStore.FUTURE;
}
});
}
} else {
destination.addMessage(context, message);
}
} }
ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message) ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException { throws IOException {
if (message.getTransactionId() != null) { if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { destination.addMessage(context, message);
destination.addMessage(context, message); return AbstractMessageStore.FUTURE;
return AbstractMessageStore.FUTURE;
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@Override
public Message getMessage() {
return message;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
return destination.asyncAddQueueMessage(ctx, message);
}
});
return AbstractMessageStore.FUTURE;
}
} else { } else {
return destination.asyncAddQueueMessage(context, message); return destination.asyncAddQueueMessage(context, message);
} }
@ -439,24 +329,8 @@ public class KahaDBTransactionStore implements TransactionStore {
throws IOException { throws IOException {
if (message.getTransactionId() != null) { if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { destination.addMessage(context, message);
destination.addMessage(context, message); return AbstractMessageStore.FUTURE;
return AbstractMessageStore.FUTURE;
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@Override
public Message getMessage() {
return message;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
return destination.asyncAddTopicMessage(ctx, message);
}
});
return AbstractMessageStore.FUTURE;
}
} else { } else {
return destination.asyncAddTopicMessage(context, message); return destination.asyncAddTopicMessage(context, message);
} }
@ -468,80 +342,17 @@ public class KahaDBTransactionStore implements TransactionStore {
*/ */
final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
throws IOException { throws IOException {
destination.removeMessage(context, ack);
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
destination.removeMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand(context) {
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.removeMessage(ctx, ack);
return AbstractMessageStore.FUTURE;
}
});
}
} else {
destination.removeMessage(context, ack);
}
} }
final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
throws IOException { throws IOException {
destination.removeAsyncMessage(context, ack);
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
destination.removeAsyncMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand(context) {
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.removeMessage(ctx, ack);
return AbstractMessageStore.FUTURE;
}
});
}
} else {
destination.removeAsyncMessage(context, ack);
}
} }
final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName, final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck ack) throws IOException { final MessageId messageId, final MessageAck ack) throws IOException {
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand(context) {
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
return AbstractMessageStore.FUTURE;
}
});
}
} else {
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
}
} }