From 675fb7bcae3a84c9fcbb1e35506fd688066fc59e Mon Sep 17 00:00:00 2001 From: Colm O hEigeartaigh Date: Mon, 20 Jan 2020 18:35:01 +0000 Subject: [PATCH] AMQ-7376 - Use correct type for collections retrieval --- .../apache/activemq/broker/region/Queue.java | 40 +++++-------------- .../activemq/store/kahadb/KahaDBStore.java | 2 +- .../transport/mqtt/MQTTProtocolConverter.java | 3 +- 3 files changed, 12 insertions(+), 33 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 86b8c6dccf..a183b673e3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -417,25 +417,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } - /* - * Holder for subscription that needs attention on next iterate browser - * needs access to existing messages in the queue that have already been - * dispatched - */ - class BrowserDispatch { - QueueBrowserSubscription browser; - - public BrowserDispatch(QueueBrowserSubscription browserSubscription) { - browser = browserSubscription; - browser.incrementQueueRef(); - } - - public QueueBrowserSubscription getBrowser() { - return browser; - } - } - - ConcurrentLinkedQueue browserDispatches = new ConcurrentLinkedQueue(); + ConcurrentLinkedQueue browserSubscriptions = new ConcurrentLinkedQueue<>(); @Override public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { @@ -484,8 +466,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (sub instanceof QueueBrowserSubscription) { // tee up for dispatch in next iterate QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; - BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription); - browserDispatches.add(browserDispatch); + browserSubscription.incrementQueueRef(); + browserSubscriptions.add(browserSubscription); } if (!this.optimizedDispatch) { @@ -598,7 +580,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty()); if (sub instanceof QueueBrowserSubscription) { ((QueueBrowserSubscription)sub).decrementQueueRef(); - browserDispatches.remove(sub); + browserSubscriptions.remove(sub); } // AMQ-5107: don't resend if the broker is shutting down if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) { @@ -1723,7 +1705,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.readLock().unlock(); } - boolean hasBrowsers = !browserDispatches.isEmpty(); + boolean hasBrowsers = !browserSubscriptions.isEmpty(); if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { try { @@ -1743,15 +1725,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInMessagesLock.readLock().unlock(); } - Iterator browsers = browserDispatches.iterator(); + Iterator browsers = browserSubscriptions.iterator(); while (browsers.hasNext()) { - BrowserDispatch browserDispatch = browsers.next(); + QueueBrowserSubscription browser = browsers.next(); try { MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(destination); - QueueBrowserSubscription browser = browserDispatch.getBrowser(); - LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size()); boolean added = false; for (MessageReference node : messagesInMemory) { @@ -1766,12 +1746,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // are we done browsing? no new messages paged if (!added || browser.atMax()) { browser.decrementQueueRef(); - browserDispatches.remove(browserDispatch); + browsers.remove(); } else { wakeup(); } } catch (Exception e) { - LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e); + LOG.warn("exception on dispatch to browser: {}", browser, e); } } } @@ -2136,7 +2116,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } private final boolean haveRealConsumer() { - return consumers.size() - browserDispatches.size() > 0; + return consumers.size() - browserSubscriptions.size() > 0; } private void doDispatch(PendingList list) throws Exception { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index a8af5aecd6..0d7feba6c2 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -404,7 +404,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, } private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException { - ProxyMessageStore store = (ProxyMessageStore) storeCache.get(convert(activeMQDestination)); + ProxyMessageStore store = (ProxyMessageStore) storeCache.get(key(convert(activeMQDestination))); if (store == null) { if (activeMQDestination.isQueue()) { store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination); diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 51475f9bfe..5cd0474986 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.DataFormatException; import java.util.zip.Inflater; -import javax.jms.Destination; import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import javax.jms.Message; @@ -558,7 +557,7 @@ public class MQTTProtocolConverter { ActiveMQDestination destination; synchronized (activeMQDestinationMap) { - destination = activeMQDestinationMap.get(command.topicName()); + destination = activeMQDestinationMap.get(command.topicName().toString()); if (destination == null) { String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); try {