ARTEMIS-4686 reduce number of FilterImpl instances
Whenever we create a queue with a filter we're instantiating 3 different `org.apache.activemq.artemis.core.filter.impl.FilterImpl` objects. This is wasteful and entirely avoidable.
This commit is contained in:
parent
cb2b293810
commit
57ed2c30b3
|
@ -34,7 +34,7 @@ public interface QueueFactory {
|
|||
@Deprecated
|
||||
Queue createQueueWith(QueueConfig config) throws Exception;
|
||||
|
||||
Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager) throws Exception;
|
||||
Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager, Filter filter) throws Exception;
|
||||
|
||||
/**
|
||||
* @deprecated Replaced by {@link #createQueueWith}
|
||||
|
|
|
@ -4127,9 +4127,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
queueConfiguration.setId(storageManager.generateID());
|
||||
|
||||
// preemptive check to ensure the filterString is good
|
||||
FilterImpl.createFilter(queueConfiguration.getFilterString());
|
||||
Filter filter = FilterImpl.createFilter(queueConfiguration.getFilterString());
|
||||
|
||||
final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager);
|
||||
final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager, filter);
|
||||
|
||||
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
|
||||
|
||||
|
|
|
@ -112,6 +112,7 @@ public class LastValueQueue extends QueueImpl {
|
|||
.setAutoDeleteMessageCount(autoDeleteMessageCount)
|
||||
.setConfigurationManaged(configurationManaged)
|
||||
.setLastValueKey(lastValueKey),
|
||||
filter,
|
||||
pagingStore,
|
||||
pageSubscription,
|
||||
scheduledExecutor,
|
||||
|
@ -124,6 +125,7 @@ public class LastValueQueue extends QueueImpl {
|
|||
}
|
||||
|
||||
public LastValueQueue(final QueueConfiguration queueConfiguration,
|
||||
final Filter filter,
|
||||
final PagingStore pagingStore,
|
||||
final PageSubscription pageSubscription,
|
||||
final ScheduledExecutorService scheduledExecutor,
|
||||
|
@ -133,7 +135,7 @@ public class LastValueQueue extends QueueImpl {
|
|||
final ArtemisExecutor executor,
|
||||
final ActiveMQServer server,
|
||||
final QueueFactory factory) {
|
||||
super(queueConfiguration, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
|
||||
super(queueConfiguration, filter, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
|
||||
this.lastValueKey = queueConfiguration.getLastValueKey();
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
|||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.filter.FilterUtils;
|
||||
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
|
||||
import org.apache.activemq.artemis.core.journal.Journal;
|
||||
|
@ -121,10 +122,11 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
int duplicateID = 0;
|
||||
for (final QueueBindingInfo queueBindingInfo : queueBindingInfos) {
|
||||
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
|
||||
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
|
||||
|
||||
if (postOffice.getBinding(queueBindingInfo.getQueueName()) != null) {
|
||||
|
||||
if (FilterUtils.isTopicIdentification(FilterImpl.createFilter(queueBindingInfo.getFilterString()))) {
|
||||
if (FilterUtils.isTopicIdentification(filter)) {
|
||||
final long tx = storageManager.generateID();
|
||||
storageManager.deleteQueueBinding(tx, queueBindingInfo.getId());
|
||||
storageManager.commitBindings(tx);
|
||||
|
@ -163,7 +165,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
.setConfigurationManaged(queueBindingInfo.isConfigurationManaged())
|
||||
.setRingSize(queueBindingInfo.getRingSize())
|
||||
.setInternal(queueBindingInfo.isInternal()),
|
||||
pagingManager);
|
||||
pagingManager,
|
||||
filter);
|
||||
|
||||
|
||||
if (queueBindingInfo.getQueueStatusEncodings() != null) {
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.activemq.artemis.api.core.Message;
|
|||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
|
||||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
|
@ -89,14 +88,14 @@ public class QueueFactoryImpl implements QueueFactory {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Queue createQueueWith(final QueueConfiguration config, PagingManager pagingManager) {
|
||||
public Queue createQueueWith(final QueueConfiguration config, PagingManager pagingManager, Filter filter) {
|
||||
validateState(config);
|
||||
final Queue queue;
|
||||
PageSubscription pageSubscription = getPageSubscription(config, pagingManager);
|
||||
PageSubscription pageSubscription = getPageSubscription(config, pagingManager, filter);
|
||||
if (lastValueKey(config) != null) {
|
||||
queue = new LastValueQueue(config.setLastValueKey(lastValueKey(config)), pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
queue = new LastValueQueue(config.setLastValueKey(lastValueKey(config)), filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
} else {
|
||||
queue = new QueueImpl(config, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
queue = new QueueImpl(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
|
||||
}
|
||||
server.getCriticalAnalyzer().add(queue);
|
||||
return queue;
|
||||
|
@ -136,13 +135,13 @@ public class QueueFactoryImpl implements QueueFactory {
|
|||
server.getCriticalAnalyzer().remove(queue);
|
||||
}
|
||||
|
||||
public static PageSubscription getPageSubscription(QueueConfiguration queueConfiguration, PagingManager pagingManager) {
|
||||
public static PageSubscription getPageSubscription(QueueConfiguration queueConfiguration, PagingManager pagingManager, Filter filter) {
|
||||
PageSubscription pageSubscription;
|
||||
|
||||
try {
|
||||
PagingStore pageStore = pagingManager.getPageStore(queueConfiguration.getAddress());
|
||||
if (pageStore != null) {
|
||||
pageSubscription = pageStore.getCursorProvider().createSubscription(queueConfiguration.getId(), FilterImpl.createFilter(queueConfiguration.getFilterString()), queueConfiguration.isDurable());
|
||||
pageSubscription = pageStore.getCursorProvider().createSubscription(queueConfiguration.getId(), filter, queueConfiguration.isDurable());
|
||||
} else {
|
||||
pageSubscription = null;
|
||||
}
|
||||
|
|
|
@ -629,6 +629,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
.setAutoDeleteMessageCount(autoDeleteMessageCount)
|
||||
.setConfigurationManaged(configurationManaged)
|
||||
.setRingSize(ringSize),
|
||||
filter,
|
||||
pagingStore,
|
||||
pageSubscription,
|
||||
scheduledExecutor,
|
||||
|
@ -638,10 +639,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
executor,
|
||||
server,
|
||||
factory);
|
||||
this.filter = filter;
|
||||
}
|
||||
|
||||
public QueueImpl(final QueueConfiguration queueConfiguration,
|
||||
final Filter filter,
|
||||
final PagingStore pagingStore,
|
||||
final PageSubscription pageSubscription,
|
||||
final ScheduledExecutorService scheduledExecutor,
|
||||
|
|
|
@ -50,9 +50,9 @@ public final class FakeQueueFactory implements QueueFactory {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager) throws Exception {
|
||||
PageSubscription pageSubscription = QueueFactoryImpl.getPageSubscription(config, pagingManager);
|
||||
return new QueueImpl(config, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this);
|
||||
public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager, Filter filter) throws Exception {
|
||||
PageSubscription pageSubscription = QueueFactoryImpl.getPageSubscription(config, pagingManager, filter);
|
||||
return new QueueImpl(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
|
|
|
@ -221,6 +221,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
|
|||
* @param executor
|
||||
*/
|
||||
MyQueueWithBlocking(final QueueConfiguration queueConfiguration,
|
||||
final Filter filter,
|
||||
final PagingStore pagingStore,
|
||||
final PageSubscription pageSubscription,
|
||||
final ScheduledExecutorService scheduledExecutor,
|
||||
|
@ -229,6 +230,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
|
|||
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
final ArtemisExecutor executor, final ActiveMQServer server) {
|
||||
super(queueConfiguration,
|
||||
filter,
|
||||
pagingStore,
|
||||
pageSubscription,
|
||||
scheduledExecutor,
|
||||
|
@ -268,9 +270,9 @@ public class HangConsumerTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Queue createQueueWith(final QueueConfiguration config, PagingManager pagingManager) {
|
||||
PageSubscription pageSubscription = getPageSubscription(config, pagingManager);
|
||||
queue = new MyQueueWithBlocking(config, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor,
|
||||
public Queue createQueueWith(final QueueConfiguration config, PagingManager pagingManager, Filter filter) {
|
||||
PageSubscription pageSubscription = getPageSubscription(config, pagingManager, filter);
|
||||
queue = new MyQueueWithBlocking(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor,
|
||||
postOffice, storageManager, addressSettingsRepository,
|
||||
executorFactory.getExecutor(), server);
|
||||
return queue;
|
||||
|
|
|
@ -23,6 +23,7 @@ import javax.jms.Session;
|
|||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -43,7 +44,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
|
||||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
|
@ -72,7 +72,6 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
public class InterruptedLargeMessageTest extends LargeMessageTestBase {
|
||||
|
||||
|
@ -573,8 +572,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager) throws Exception {
|
||||
return new NoPostACKQueue(config.getId(), config.getAddress(), config.getName(), FilterImpl.createFilter(config.getFilterString()), config.getUser(), QueueFactoryImpl.getPageSubscription(config, pagingManager), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, server, execFactory.getExecutor());
|
||||
public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager, Filter filter) throws Exception {
|
||||
return new NoPostACKQueue(config.getId(), config.getAddress(), config.getName(), filter, config.getUser(), QueueFactoryImpl.getPageSubscription(config, pagingManager, filter), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, server, execFactory.getExecutor());
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
|
|
Loading…
Reference in New Issue