AMQ-7376 - Use correct type for collections retrieval

This commit is contained in:
Colm O hEigeartaigh 2020-01-20 18:35:01 +00:00
parent d41658715b
commit 675fb7bcae
3 changed files with 12 additions and 33 deletions

View File

@ -417,25 +417,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
} }
/* ConcurrentLinkedQueue<QueueBrowserSubscription> browserSubscriptions = new ConcurrentLinkedQueue<>();
* Holder for subscription that needs attention on next iterate browser
* needs access to existing messages in the queue that have already been
* dispatched
*/
class BrowserDispatch {
QueueBrowserSubscription browser;
public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
browser = browserSubscription;
browser.incrementQueueRef();
}
public QueueBrowserSubscription getBrowser() {
return browser;
}
}
ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
@Override @Override
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
@ -484,8 +466,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (sub instanceof QueueBrowserSubscription) { if (sub instanceof QueueBrowserSubscription) {
// tee up for dispatch in next iterate // tee up for dispatch in next iterate
QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription); browserSubscription.incrementQueueRef();
browserDispatches.add(browserDispatch); browserSubscriptions.add(browserSubscription);
} }
if (!this.optimizedDispatch) { if (!this.optimizedDispatch) {
@ -598,7 +580,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty()); dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty());
if (sub instanceof QueueBrowserSubscription) { if (sub instanceof QueueBrowserSubscription) {
((QueueBrowserSubscription)sub).decrementQueueRef(); ((QueueBrowserSubscription)sub).decrementQueueRef();
browserDispatches.remove(sub); browserSubscriptions.remove(sub);
} }
// AMQ-5107: don't resend if the broker is shutting down // AMQ-5107: don't resend if the broker is shutting down
if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) { if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) {
@ -1723,7 +1705,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInPendingDispatchLock.readLock().unlock(); pagedInPendingDispatchLock.readLock().unlock();
} }
boolean hasBrowsers = !browserDispatches.isEmpty(); boolean hasBrowsers = !browserSubscriptions.isEmpty();
if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
try { try {
@ -1743,15 +1725,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInMessagesLock.readLock().unlock(); pagedInMessagesLock.readLock().unlock();
} }
Iterator<BrowserDispatch> browsers = browserDispatches.iterator(); Iterator<QueueBrowserSubscription> browsers = browserSubscriptions.iterator();
while (browsers.hasNext()) { while (browsers.hasNext()) {
BrowserDispatch browserDispatch = browsers.next(); QueueBrowserSubscription browser = browsers.next();
try { try {
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination); msgContext.setDestination(destination);
QueueBrowserSubscription browser = browserDispatch.getBrowser();
LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size()); LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size());
boolean added = false; boolean added = false;
for (MessageReference node : messagesInMemory) { for (MessageReference node : messagesInMemory) {
@ -1766,12 +1746,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// are we done browsing? no new messages paged // are we done browsing? no new messages paged
if (!added || browser.atMax()) { if (!added || browser.atMax()) {
browser.decrementQueueRef(); browser.decrementQueueRef();
browserDispatches.remove(browserDispatch); browsers.remove();
} else { } else {
wakeup(); wakeup();
} }
} catch (Exception e) { } catch (Exception e) {
LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e); LOG.warn("exception on dispatch to browser: {}", browser, e);
} }
} }
} }
@ -2136,7 +2116,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
private final boolean haveRealConsumer() { private final boolean haveRealConsumer() {
return consumers.size() - browserDispatches.size() > 0; return consumers.size() - browserSubscriptions.size() > 0;
} }
private void doDispatch(PendingList list) throws Exception { private void doDispatch(PendingList list) throws Exception {

View File

@ -404,7 +404,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
} }
private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException { private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException {
ProxyMessageStore store = (ProxyMessageStore) storeCache.get(convert(activeMQDestination)); ProxyMessageStore store = (ProxyMessageStore) storeCache.get(key(convert(activeMQDestination)));
if (store == null) { if (store == null) {
if (activeMQDestination.isQueue()) { if (activeMQDestination.isQueue()) {
store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination); store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination);

View File

@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
import java.util.zip.Inflater; import java.util.zip.Inflater;
import javax.jms.Destination;
import javax.jms.InvalidClientIDException; import javax.jms.InvalidClientIDException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -558,7 +557,7 @@ public class MQTTProtocolConverter {
ActiveMQDestination destination; ActiveMQDestination destination;
synchronized (activeMQDestinationMap) { synchronized (activeMQDestinationMap) {
destination = activeMQDestinationMap.get(command.topicName()); destination = activeMQDestinationMap.get(command.topicName().toString());
if (destination == null) { if (destination == null) {
String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
try { try {