git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1187538 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-21 21:02:56 +00:00
parent 5b796cd8d6
commit c273cab44a
1 changed files with 12 additions and 2 deletions

View File

@ -445,6 +445,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// public for testing
@SuppressWarnings("rawtypes")
public Location getFirstInProgressTxLocation() {
Location l = null;
synchronized (inflightTransactions) {
@ -854,7 +855,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
if (!checkpointThread.isAlive()) {
if (checkpointThread == null || !checkpointThread.isAlive()) {
startCheckpoint();
}
return location;
@ -967,6 +968,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
});
}
@SuppressWarnings("rawtypes")
protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
@ -985,6 +987,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@SuppressWarnings("rawtypes")
protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
@ -1047,6 +1050,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@SuppressWarnings("rawtypes")
protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
List<Operation> inflightTx;
@ -1077,6 +1081,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@SuppressWarnings("rawtypes")
protected void process(KahaPrepareCommand command, Location location) {
TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
synchronized (inflightTransactions) {
@ -1087,6 +1092,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@SuppressWarnings("rawtypes")
protected void process(KahaRollbackCommand command, Location location) throws IOException {
TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
List<Operation> updates = null;
@ -1101,6 +1107,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@SuppressWarnings("rawtypes")
private void persistRedeliveryCount(List<Operation> updates) throws IOException {
if (updates != null) {
for (Operation operation : updates) {
@ -1557,7 +1564,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (dataIn.readBoolean()) {
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
if (metadata.version >= 3) {
if (metadata.version >= VERSION) {
value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
} else {
// upgrade
@ -1963,7 +1970,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// /////////////////////////////////////////////////////////////////
// Transaction related implementation methods.
// /////////////////////////////////////////////////////////////////
@SuppressWarnings("rawtypes")
private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
@SuppressWarnings("rawtypes")
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
protected final Set<String> ackedAndPrepared = new HashSet<String>();
@ -1994,6 +2003,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@SuppressWarnings("rawtypes")
private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
TransactionId key = TransactionIdConversion.convert(info);
List<Operation> tx;