Fix slow ack replication.

This commit is contained in:
Nikita Shupletsov 2023-06-26 16:13:42 -07:00
parent a062fcb87f
commit e8d92fe5a5
1 changed files with 12 additions and 11 deletions

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -2475,26 +2476,26 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
QueueMessageReference message = null; QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId(); MessageId messageId = messageDispatchNotification.getMessageId();
Set<MessageReference> set = new LinkedHashSet<MessageReference>(); int size = 0;
do { do {
doPageIn(true, false, getMaxPageSize()); doPageIn(true, false, getMaxPageSize());
pagedInMessagesLock.readLock().lock(); pagedInMessagesLock.readLock().lock();
try { try {
if (!set.addAll(pagedInMessages.values())) { if (pagedInMessages.size() == size) {
// nothing new to check - mem constraint on page in // nothing new to check - mem constraint on page in
break; break;
};
} finally {
pagedInMessagesLock.readLock().unlock();
} }
List<MessageReference> list = new ArrayList<MessageReference>(set); size = pagedInMessages.size();
for (MessageReference ref : list) { for (MessageReference ref : pagedInMessages) {
if (ref.getMessageId().equals(messageId)) { if (ref.getMessageId().equals(messageId)) {
message = (QueueMessageReference) ref; message = (QueueMessageReference) ref;
break; break;
} }
} }
} while (set.size() < this.destinationStatistics.getMessages().getCount()); } finally {
pagedInMessagesLock.readLock().unlock();
}
} while (size < this.destinationStatistics.getMessages().getCount());
if (message == null) { if (message == null) {
throw new JMSException("Slave broker out of sync with master - Message: " throw new JMSException("Slave broker out of sync with master - Message: "