diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java index 0dce140f46..98c0564a26 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java @@ -34,7 +34,7 @@ public interface QueueFactory { @Deprecated Queue createQueueWith(QueueConfig config) throws Exception; - Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager) throws Exception; + Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager, Filter filter) throws Exception; /** * @deprecated Replaced by {@link #createQueueWith} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 20cace6b67..6813bee2d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -4127,9 +4127,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { queueConfiguration.setId(storageManager.generateID()); // preemptive check to ensure the filterString is good - FilterImpl.createFilter(queueConfiguration.getFilterString()); + Filter filter = FilterImpl.createFilter(queueConfiguration.getFilterString()); - final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager); + final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager, filter); final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index e67ae7dab1..c08f28b95b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -112,6 +112,7 @@ public class LastValueQueue extends QueueImpl { .setAutoDeleteMessageCount(autoDeleteMessageCount) .setConfigurationManaged(configurationManaged) .setLastValueKey(lastValueKey), + filter, pagingStore, pageSubscription, scheduledExecutor, @@ -124,6 +125,7 @@ public class LastValueQueue extends QueueImpl { } public LastValueQueue(final QueueConfiguration queueConfiguration, + final Filter filter, final PagingStore pagingStore, final PageSubscription pageSubscription, final ScheduledExecutorService scheduledExecutor, @@ -133,7 +135,7 @@ public class LastValueQueue extends QueueImpl { final ArtemisExecutor executor, final ActiveMQServer server, final QueueFactory factory) { - super(queueConfiguration, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); + super(queueConfiguration, filter, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); this.lastValueKey = queueConfiguration.getLastValueKey(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 3dc127894a..e6c6f706cd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.FilterUtils; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.journal.Journal; @@ -121,10 +122,11 @@ public class PostOfficeJournalLoader implements JournalLoader { int duplicateID = 0; for (final QueueBindingInfo queueBindingInfo : queueBindingInfos) { queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo); + Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString()); if (postOffice.getBinding(queueBindingInfo.getQueueName()) != null) { - if (FilterUtils.isTopicIdentification(FilterImpl.createFilter(queueBindingInfo.getFilterString()))) { + if (FilterUtils.isTopicIdentification(filter)) { final long tx = storageManager.generateID(); storageManager.deleteQueueBinding(tx, queueBindingInfo.getId()); storageManager.commitBindings(tx); @@ -163,7 +165,8 @@ public class PostOfficeJournalLoader implements JournalLoader { .setConfigurationManaged(queueBindingInfo.isConfigurationManaged()) .setRingSize(queueBindingInfo.getRingSize()) .setInternal(queueBindingInfo.isInternal()), - pagingManager); + pagingManager, + filter); if (queueBindingInfo.getQueueStatusEncodings() != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 13e3c313b3..4464fefdf8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -23,7 +23,6 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -89,14 +88,14 @@ public class QueueFactoryImpl implements QueueFactory { } @Override - public Queue createQueueWith(final QueueConfiguration config, PagingManager pagingManager) { + public Queue createQueueWith(final QueueConfiguration config, PagingManager pagingManager, Filter filter) { validateState(config); final Queue queue; - PageSubscription pageSubscription = getPageSubscription(config, pagingManager); + PageSubscription pageSubscription = getPageSubscription(config, pagingManager, filter); if (lastValueKey(config) != null) { - queue = new LastValueQueue(config.setLastValueKey(lastValueKey(config)), pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); + queue = new LastValueQueue(config.setLastValueKey(lastValueKey(config)), filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } else { - queue = new QueueImpl(config, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); + queue = new QueueImpl(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } server.getCriticalAnalyzer().add(queue); return queue; @@ -136,13 +135,13 @@ public class QueueFactoryImpl implements QueueFactory { server.getCriticalAnalyzer().remove(queue); } - public static PageSubscription getPageSubscription(QueueConfiguration queueConfiguration, PagingManager pagingManager) { + public static PageSubscription getPageSubscription(QueueConfiguration queueConfiguration, PagingManager pagingManager, Filter filter) { PageSubscription pageSubscription; try { PagingStore pageStore = pagingManager.getPageStore(queueConfiguration.getAddress()); if (pageStore != null) { - pageSubscription = pageStore.getCursorProvider().createSubscription(queueConfiguration.getId(), FilterImpl.createFilter(queueConfiguration.getFilterString()), queueConfiguration.isDurable()); + pageSubscription = pageStore.getCursorProvider().createSubscription(queueConfiguration.getId(), filter, queueConfiguration.isDurable()); } else { pageSubscription = null; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index b7bb6676c1..fb2808ab5d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -629,6 +629,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { .setAutoDeleteMessageCount(autoDeleteMessageCount) .setConfigurationManaged(configurationManaged) .setRingSize(ringSize), + filter, pagingStore, pageSubscription, scheduledExecutor, @@ -638,10 +639,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { executor, server, factory); - this.filter = filter; } public QueueImpl(final QueueConfiguration queueConfiguration, + final Filter filter, final PagingStore pagingStore, final PageSubscription pageSubscription, final ScheduledExecutorService scheduledExecutor, diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java index 93e5655001..0d272c95ae 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java @@ -50,9 +50,9 @@ public final class FakeQueueFactory implements QueueFactory { } @Override - public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager) throws Exception { - PageSubscription pageSubscription = QueueFactoryImpl.getPageSubscription(config, pagingManager); - return new QueueImpl(config, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this); + public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager, Filter filter) throws Exception { + PageSubscription pageSubscription = QueueFactoryImpl.getPageSubscription(config, pagingManager, filter); + return new QueueImpl(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this); } @Deprecated diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 6cb056689f..7c43b2df30 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -221,6 +221,7 @@ public class HangConsumerTest extends ActiveMQTestBase { * @param executor */ MyQueueWithBlocking(final QueueConfiguration queueConfiguration, + final Filter filter, final PagingStore pagingStore, final PageSubscription pageSubscription, final ScheduledExecutorService scheduledExecutor, @@ -229,6 +230,7 @@ public class HangConsumerTest extends ActiveMQTestBase { final HierarchicalRepository addressSettingsRepository, final ArtemisExecutor executor, final ActiveMQServer server) { super(queueConfiguration, + filter, pagingStore, pageSubscription, scheduledExecutor, @@ -268,9 +270,9 @@ public class HangConsumerTest extends ActiveMQTestBase { } @Override - public Queue createQueueWith(final QueueConfiguration config, PagingManager pagingManager) { - PageSubscription pageSubscription = getPageSubscription(config, pagingManager); - queue = new MyQueueWithBlocking(config, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, + public Queue createQueueWith(final QueueConfiguration config, PagingManager pagingManager, Filter filter) { + PageSubscription pageSubscription = getPageSubscription(config, pagingManager, filter); + queue = new MyQueueWithBlocking(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server); return queue; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index 97bbf5209b..ea3d0e5561 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -23,6 +23,7 @@ import javax.jms.Session; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; @@ -43,7 +44,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -72,7 +72,6 @@ import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; public class InterruptedLargeMessageTest extends LargeMessageTestBase { @@ -573,8 +572,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { } @Override - public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager) throws Exception { - return new NoPostACKQueue(config.getId(), config.getAddress(), config.getName(), FilterImpl.createFilter(config.getFilterString()), config.getUser(), QueueFactoryImpl.getPageSubscription(config, pagingManager), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, server, execFactory.getExecutor()); + public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager, Filter filter) throws Exception { + return new NoPostACKQueue(config.getId(), config.getAddress(), config.getName(), filter, config.getUser(), QueueFactoryImpl.getPageSubscription(config, pagingManager, filter), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, server, execFactory.getExecutor()); } @Deprecated