diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java index 20cbfae4b0..9a93c27ef9 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java @@ -22,7 +22,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -59,6 +58,7 @@ import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectReposito import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; @Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)") public class PrintData extends OptionalLocking { @@ -146,8 +146,8 @@ public class PrintData extends OptionalLocking { final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory()); ExecutorFactory execfactory = new ExecutorFactory() { @Override - public Executor getExecutor() { - return executor; + public ArtemisExecutor getExecutor() { + return ArtemisExecutor.delegate(executor); } }; final StorageManager sm = new NullStorageManager(); diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java index d37c965b89..f297a76e7b 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java @@ -31,7 +31,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -82,6 +81,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; @@ -387,8 +387,8 @@ public final class XmlDataExporter extends OptionalLocking { final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory()); ExecutorFactory executorFactory = new ExecutorFactory() { @Override - public Executor getExecutor() { - return executor; + public ArtemisExecutor getExecutor() { + return ArtemisExecutor.delegate(executor); } }; PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduled, executorFactory, true, null); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java index dd0209baa2..7fe33f04d4 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.artemis.utils; -import java.util.concurrent.Executor; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; public interface ExecutorFactory { - Executor getExecutor(); + ArtemisExecutor getExecutor(); } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java new file mode 100644 index 0000000000..d3036ec76f --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +public interface ArtemisExecutor extends Executor { + + /** + * Artemis is supposed to implement this properly, however in tests or tools + * this can be used as a fake, doing a sipmle delegate and using the default methods implemented here. + * @param executor + * @return + */ + static ArtemisExecutor delegate(Executor executor) { + return new ArtemisExecutor() { + @Override + public void execute(Runnable command) { + executor.execute(command); + } + }; + } + + default boolean flush() { + return flush(30, TimeUnit.SECONDS); + } + + default boolean flush(long timeout, TimeUnit unit) { + CountDownLatch latch = new CountDownLatch(1); + Runnable runnable = new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }; + execute(runnable); + try { + return latch.await(timeout, unit); + } catch (InterruptedException e) { + return false; + } + } + + /** + * This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor} + * @return + */ + default boolean isFlushed() { + return flush(100, TimeUnit.MILLISECONDS); + } + +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java index 6f0ee9aa6f..8a02497b00 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java @@ -28,7 +28,7 @@ import org.jboss.logging.Logger; * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the * same method, will result in B's task running after A's. */ -public class OrderedExecutor extends ProcessorBase implements Executor { +public class OrderedExecutor extends ProcessorBase implements ArtemisExecutor { public OrderedExecutor(Executor delegate) { super(delegate); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java index da61f3d936..07e1e7bedb 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java @@ -57,7 +57,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory { * @return an ordered executor */ @Override - public Executor getExecutor() { + public ArtemisExecutor getExecutor() { return new OrderedExecutor(parent); } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java index 07ed9e943c..fcd197d143 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java @@ -100,6 +100,10 @@ public abstract class ProcessorBase { return stateUpdater.get(this) == STATE_NOT_RUNNING; } + public final boolean isFlushed() { + return stateUpdater.get(this) == STATE_NOT_RUNNING; + } + protected void task(T command) { tasks.add(command); startPoller(); diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java index d0bada8482..6c44296fad 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java @@ -20,7 +20,6 @@ package org.apache.activemq.artemis.core.io; import java.io.File; import java.util.ArrayList; import java.util.Arrays; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -38,6 +37,7 @@ import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; /** * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM @@ -91,7 +91,7 @@ public class JournalTptBenchmark { } else { final ArrayList> tasks = new ArrayList<>(); service = Executors.newSingleThreadExecutor(); - journal = new JournalImpl(() -> new Executor() { + journal = new JournalImpl(() -> new ArtemisExecutor() { private final MpscArrayQueue taskQueue = new MpscArrayQueue<>(1024); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java index 75799d2cfb..6cfaf20900 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; /** * The integration point between the PagingManger and the File System (aka SequentialFiles) @@ -38,7 +39,7 @@ public interface PagingStoreFactory { PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, - Executor executor); + ArtemisExecutor executor); void stop() throws InterruptedException; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index cec7f526ad..985f563e04 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -16,13 +16,12 @@ */ package org.apache.activemq.artemis.core.paging.cursor; -import java.util.concurrent.Executor; - import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; public interface PageSubscription { @@ -155,7 +154,7 @@ public interface PageSubscription { /** * @return executor used by the PageSubscription */ - Executor getExecutor(); + ArtemisExecutor getExecutor(); /** * @param deletedPage diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 701f86cd57..c1e1761c40 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.filter.Filter; @@ -41,6 +40,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.SoftValueHashMap; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.jboss.logging.Logger; /** @@ -68,7 +68,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { protected final StorageManager storageManager; // This is the same executor used at the PageStoreImpl. One Executor per pageStore - private final Executor executor; + private final ArtemisExecutor executor; private final SoftValueHashMap softCache; @@ -80,7 +80,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { public PageCursorProviderImpl(final PagingStore pagingStore, final StorageManager storageManager, - final Executor executor, + final ArtemisExecutor executor, final int maxCacheSize) { this.pagingStore = pagingStore; this.storageManager = storageManager; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index f0451519da..fa2c748130 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -28,7 +28,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -56,6 +55,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.FutureLatch; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.jboss.logging.Logger; @@ -92,14 +92,14 @@ final class PageSubscriptionImpl implements PageSubscription { private final PageSubscriptionCounter counter; - private final Executor executor; + private final ArtemisExecutor executor; private final AtomicLong deliveredCount = new AtomicLong(0); PageSubscriptionImpl(final PageCursorProvider cursorProvider, final PagingStore pageStore, final StorageManager store, - final Executor executor, + final ArtemisExecutor executor, final Filter filter, final long cursorId, final boolean persistent) { @@ -743,7 +743,7 @@ final class PageSubscriptionImpl implements PageSubscription { } @Override - public Executor getExecutor() { + public ArtemisExecutor getExecutor() { return executor; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java index 2daa89ef82..444c59ce9b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java @@ -45,6 +45,7 @@ import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriv import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; /** * Integration point between Paging and JDBC @@ -153,7 +154,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, - Executor executor) { + ArtemisExecutor executor) { return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index c65b913cd7..b2e3d4f923 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; /** * Integration point between Paging and NIO @@ -115,7 +116,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, - Executor executor) { + ArtemisExecutor executor) { return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index ad9e21801b..6f85aa24b1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,6 +61,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.utils.FutureLatch; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.jboss.logging.Logger; /** @@ -101,7 +101,7 @@ public class PagingStoreImpl implements PagingStore { private final boolean usingGlobalMaxSize; - private final Executor executor; + private final ArtemisExecutor executor; // Bytes consumed by the queue on the memory private final AtomicLong sizeInBytes = new AtomicLong(); @@ -137,7 +137,7 @@ public class PagingStoreImpl implements PagingStore { final PagingStoreFactory storeFactory, final SimpleString storeName, final AddressSettings addressSettings, - final Executor executor, + final ArtemisExecutor executor, final boolean syncNonTransactional) { if (pagingManager == null) { throw new IllegalStateException("Paging Manager can't be null"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 192f25c7f1..5951a01675 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -29,13 +29,13 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; /** * A queue that will discard messages if a newer message with the same @@ -65,7 +65,7 @@ public class LastValueQueue extends QueueImpl { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor, + final ArtemisExecutor executor, final ActiveMQServer server, final QueueFactory factory) { super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index ecfbf0970e..35dd5ed47a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -32,7 +32,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -89,6 +88,7 @@ import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; @@ -118,7 +118,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public static final int MAX_DELIVERIES_IN_LOOP = 1000; - public static final int CHECK_QUEUE_SIZE_PERIOD = 100; + public static final int CHECK_QUEUE_SIZE_PERIOD = 1000; /** * If The system gets slow for any reason, this is the maximum time a Delivery or @@ -228,7 +228,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private int pos; - private final Executor executor; + private final ArtemisExecutor executor; private boolean internalQueue; @@ -342,7 +342,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor, + final ArtemisExecutor executor, final ActiveMQServer server, final QueueFactory factory) { this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); @@ -361,7 +361,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor, + final ArtemisExecutor executor, final ActiveMQServer server, final QueueFactory factory) { this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); @@ -383,7 +383,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor, + final ArtemisExecutor executor, final ActiveMQServer server, final QueueFactory factory) { super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS); @@ -654,19 +654,27 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return; } - synchronized (directDeliveryGuard) { - // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the - // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty - // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue - if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) { - lastDirectDeliveryCheck = System.currentTimeMillis(); + if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) { + if (logger.isTraceEnabled()) { + logger.trace("Checking to re-enable direct deliver on queue " + this.getName()); + } + lastDirectDeliveryCheck = System.currentTimeMillis(); + synchronized (directDeliveryGuard) { + // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the + // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty + // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue - if (intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) { + if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) { // We must block on the executor to ensure any async deliveries have completed or we might get out of order // deliveries - if (flushExecutor() && flushDeliveriesInTransit()) { - // Go into direct delivery mode - directDeliver = supportsDirectDeliver; + // Go into direct delivery mode + directDeliver = supportsDirectDeliver; + if (logger.isTraceEnabled()) { + logger.trace("Setting direct deliverer to " + supportsDirectDeliver); + } + } else { + if (logger.isTraceEnabled()) { + logger.trace("Couldn't set direct deliver back"); } } } @@ -773,7 +781,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public Executor getExecutor() { + public ArtemisExecutor getExecutor() { if (pageSubscription != null && pageSubscription.isPaging()) { // When in page mode, we don't want to have concurrent IO on the same PageStore return pageSubscription.getExecutor(); @@ -791,7 +799,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public boolean flushExecutor() { - boolean ok = internalFlushExecutor(10000); + boolean ok = internalFlushExecutor(10000, true); if (!ok) { ActiveMQServerLogger.LOGGER.errorFlushingExecutorsOnQueue(); @@ -800,14 +808,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return ok; } - private boolean internalFlushExecutor(long timeout) { + private boolean internalFlushExecutor(long timeout, boolean log) { FutureLatch future = new FutureLatch(); getExecutor().execute(future); boolean result = future.await(timeout); - if (!result) { + if (log && !result) { ActiveMQServerLogger.LOGGER.queueBusy(this.name.toString(), timeout); } return result; @@ -2344,7 +2352,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - private void internalAddRedistributor(final Executor executor) { + private void internalAddRedistributor(final ArtemisExecutor executor) { // create the redistributor only once if there are no local consumers if (consumerSet.isEmpty() && redistributor == null) { if (logger.isTraceEnabled()) { @@ -2745,9 +2753,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private void proceedDeliver(Consumer consumer, MessageReference reference) { try { consumer.proceedDeliver(reference); - deliveriesInTransit.countDown(); } catch (Throwable t) { - deliveriesInTransit.countDown(); ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference); synchronized (this) { @@ -2761,6 +2767,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // The message failed to be delivered, hence we try again addHead(reference, false); } + } finally { + deliveriesInTransit.countDown(); } } @@ -2949,9 +2957,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private class DelayedAddRedistributor implements Runnable { - private final Executor executor1; + private final ArtemisExecutor executor1; - DelayedAddRedistributor(final Executor executor) { + DelayedAddRedistributor(final ArtemisExecutor executor) { this.executor1 = executor; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index afe1403d55..40ccefce0b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -16,17 +16,15 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import javax.management.MBeanServer; import java.lang.management.ManagementFactory; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import javax.management.MBeanServer; - import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; @@ -74,6 +72,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -236,7 +235,7 @@ public class HangConsumerTest extends ActiveMQTestBase { final PostOffice postOffice, final StorageManager storageManager, final HierarchicalRepository addressSettingsRepository, - final Executor executor, final ActiveMQServer server) { + final ArtemisExecutor executor, final ActiveMQServer server) { super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, null); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index 66381fa145..6056fcb98e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -16,19 +16,17 @@ */ package org.apache.activemq.artemis.tests.integration.client; -import java.io.IOException; -import java.util.HashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicInteger; - import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; @@ -64,6 +62,7 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -518,7 +517,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { PostOffice postOffice, StorageManager storageManager, HierarchicalRepository addressSettingsRepository, - Executor executor) { + ArtemisExecutor executor) { super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, null, null); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 7eadeca236..542d94d99f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -32,7 +32,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -86,6 +85,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.jboss.logging.Logger; import org.junit.After; import org.junit.Assert; @@ -3056,7 +3056,7 @@ public class PagingTest extends ActiveMQTestBase { InterruptedCursorProvider(PagingStore pagingStore, StorageManager storageManager, - Executor executor, + ArtemisExecutor executor, int maxCacheSize) { super(pagingStore, storageManager, executor, maxCacheSize); } @@ -3082,7 +3082,7 @@ public class PagingTest extends ActiveMQTestBase { public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, - Executor executor) { + ArtemisExecutor executor) { return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); } }; diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java index b5dc4634fb..ab36b33ec8 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakeConsumer; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -70,7 +71,7 @@ public class QueueImplTest extends ActiveMQTestBase { QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, - Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null); + ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null); // Send one scheduled @@ -135,7 +136,8 @@ public class QueueImplTest extends ActiveMQTestBase { @Test public void testScheduled() throws Exception { - QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null); + QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, + ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null); FakeConsumer consumer = null; @@ -233,7 +235,8 @@ public class QueueImplTest extends ActiveMQTestBase { public void disconnect() { } }; - QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null); + QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, + ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null); MessageReference messageReference = generateReference(queue, 1); queue.addConsumer(consumer); messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 4ddde1cc8d..d261f64df3 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -67,6 +66,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -743,8 +743,8 @@ public class PagingStoreImplTest extends ActiveMQTestBase { return new ExecutorFactory() { @Override - public Executor getExecutor() { - return executor; + public ArtemisExecutor getExecutor() { + return ArtemisExecutor.delegate(executor); } }; } @@ -818,7 +818,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, - Executor executor) { + ArtemisExecutor executor) { return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index 40f9214ced..0aa6e5c84a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -49,6 +49,7 @@ import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOff import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.FutureLatch; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.junit.After; import org.junit.Assert; @@ -1310,6 +1311,6 @@ public class QueueImplTest extends ActiveMQTestBase { private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) { return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor, - new FakePostOffice(), null, null, executor, null, null); + new FakePostOffice(), null, null, ArtemisExecutor.delegate(executor), null, null); } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java index 4721579ac8..88f0019fa2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; public final class FakeQueueFactory implements QueueFactory { @@ -42,7 +43,7 @@ public final class FakeQueueFactory implements QueueFactory { public Queue createQueueWith(final QueueConfig config) { return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), - scheduledExecutor, postOffice, null, null, executor, null, this); + scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this); } @Deprecated @@ -57,7 +58,7 @@ public final class FakeQueueFactory implements QueueFactory { final boolean temporary, final boolean autoCreated) { return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated, - scheduledExecutor, postOffice, null, null, executor, null, this); + scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this); } @Override