This commit is contained in:
gtully 2014-03-03 16:20:21 +00:00
parent 5d511975fc
commit ab01ae3818
1 changed files with 2 additions and 13 deletions

View File

@ -2302,7 +2302,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
protected final Set<String> ackedAndPrepared = new HashSet<String>(); protected final Set<String> ackedAndPrepared = new HashSet<String>();
protected final Set<String> rolledBackAcks = new HashSet<String>();
// messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
// till then they are skipped by the store. // till then they are skipped by the store.
@ -2318,16 +2317,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} }
public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
if (acks != null) { if (acks != null) {
this.indexLock.writeLock().lock(); this.indexLock.writeLock().lock();
try { try {
for (MessageAck ack : acks) { for (MessageAck ack : acks) {
final String id = ack.getLastMessageId().toProducerKey(); ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey());
ackedAndPrepared.remove(id);
if (rollback) {
rolledBackAcks.add(id);
}
} }
} finally { } finally {
this.indexLock.writeLock().unlock(); this.indexLock.writeLock().unlock();
@ -2950,12 +2945,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return lastGetPriority; return lastGetPriority;
} }
public boolean alreadyDispatched(Long sequence) {
return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
(cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
(cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
}
class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
Iterator<Entry<Long, MessageKeys>>currentIterator; Iterator<Entry<Long, MessageKeys>>currentIterator;
final Iterator<Entry<Long, MessageKeys>>highIterator; final Iterator<Entry<Long, MessageKeys>>highIterator;