This closes #1447
This commit is contained in:
commit
d4a7aebb6d
|
@ -22,7 +22,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -59,6 +58,7 @@ import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectReposito
|
|||
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
|
||||
public class PrintData extends OptionalLocking {
|
||||
|
@ -146,8 +146,8 @@ public class PrintData extends OptionalLocking {
|
|||
final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
|
||||
ExecutorFactory execfactory = new ExecutorFactory() {
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return executor;
|
||||
public ArtemisExecutor getExecutor() {
|
||||
return ArtemisExecutor.delegate(executor);
|
||||
}
|
||||
};
|
||||
final StorageManager sm = new NullStorageManager();
|
||||
|
|
|
@ -31,7 +31,6 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -82,6 +81,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
|||
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
|
||||
|
||||
|
@ -387,8 +387,8 @@ public final class XmlDataExporter extends OptionalLocking {
|
|||
final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
|
||||
ExecutorFactory executorFactory = new ExecutorFactory() {
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return executor;
|
||||
public ArtemisExecutor getExecutor() {
|
||||
return ArtemisExecutor.delegate(executor);
|
||||
}
|
||||
};
|
||||
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduled, executorFactory, true, null);
|
||||
|
|
|
@ -16,9 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.utils;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
public interface ExecutorFactory {
|
||||
|
||||
Executor getExecutor();
|
||||
ArtemisExecutor getExecutor();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.utils.actors;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public interface ArtemisExecutor extends Executor {
|
||||
|
||||
/**
|
||||
* Artemis is supposed to implement this properly, however in tests or tools
|
||||
* this can be used as a fake, doing a sipmle delegate and using the default methods implemented here.
|
||||
* @param executor
|
||||
* @return
|
||||
*/
|
||||
static ArtemisExecutor delegate(Executor executor) {
|
||||
return new ArtemisExecutor() {
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
executor.execute(command);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
default boolean flush() {
|
||||
return flush(30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
default boolean flush(long timeout, TimeUnit unit) {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
execute(runnable);
|
||||
try {
|
||||
return latch.await(timeout, unit);
|
||||
} catch (InterruptedException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
|
||||
* @return
|
||||
*/
|
||||
default boolean isFlushed() {
|
||||
return flush(100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
}
|
|
@ -28,7 +28,7 @@ import org.jboss.logging.Logger;
|
|||
* More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
|
||||
* same method, will result in B's task running after A's.
|
||||
*/
|
||||
public class OrderedExecutor extends ProcessorBase<Runnable> implements Executor {
|
||||
public class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisExecutor {
|
||||
|
||||
public OrderedExecutor(Executor delegate) {
|
||||
super(delegate);
|
||||
|
|
|
@ -57,7 +57,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
|
|||
* @return an ordered executor
|
||||
*/
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
public ArtemisExecutor getExecutor() {
|
||||
return new OrderedExecutor(parent);
|
||||
}
|
||||
|
||||
|
|
|
@ -100,6 +100,10 @@ public abstract class ProcessorBase<T> {
|
|||
return stateUpdater.get(this) == STATE_NOT_RUNNING;
|
||||
}
|
||||
|
||||
public final boolean isFlushed() {
|
||||
return stateUpdater.get(this) == STATE_NOT_RUNNING;
|
||||
}
|
||||
|
||||
protected void task(T command) {
|
||||
tasks.add(command);
|
||||
startPoller();
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.activemq.artemis.core.io;
|
|||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -38,6 +37,7 @@ import org.apache.activemq.artemis.core.journal.Journal;
|
|||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
/**
|
||||
* To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
|
||||
|
@ -91,7 +91,7 @@ public class JournalTptBenchmark {
|
|||
} else {
|
||||
final ArrayList<MpscArrayQueue<Runnable>> tasks = new ArrayList<>();
|
||||
service = Executors.newSingleThreadExecutor();
|
||||
journal = new JournalImpl(() -> new Executor() {
|
||||
journal = new JournalImpl(() -> new ArtemisExecutor() {
|
||||
|
||||
private final MpscArrayQueue<Runnable> taskQueue = new MpscArrayQueue<>(1024);
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
|
|||
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
/**
|
||||
* The integration point between the PagingManger and the File System (aka SequentialFiles)
|
||||
|
@ -38,7 +39,7 @@ public interface PagingStoreFactory {
|
|||
PageCursorProvider newCursorProvider(PagingStore store,
|
||||
StorageManager storageManager,
|
||||
AddressSettings addressSettings,
|
||||
Executor executor);
|
||||
ArtemisExecutor executor);
|
||||
|
||||
void stop() throws InterruptedException;
|
||||
|
||||
|
|
|
@ -16,13 +16,12 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.paging.cursor;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||
|
||||
public interface PageSubscription {
|
||||
|
@ -155,7 +154,7 @@ public interface PageSubscription {
|
|||
/**
|
||||
* @return executor used by the PageSubscription
|
||||
*/
|
||||
Executor getExecutor();
|
||||
ArtemisExecutor getExecutor();
|
||||
|
||||
/**
|
||||
* @param deletedPage
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.util.Collection;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
|
@ -41,6 +40,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
|
|||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
import org.apache.activemq.artemis.utils.SoftValueHashMap;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
|
@ -68,7 +68,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
protected final StorageManager storageManager;
|
||||
|
||||
// This is the same executor used at the PageStoreImpl. One Executor per pageStore
|
||||
private final Executor executor;
|
||||
private final ArtemisExecutor executor;
|
||||
|
||||
private final SoftValueHashMap<Long, PageCache> softCache;
|
||||
|
||||
|
@ -80,7 +80,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
|
||||
public PageCursorProviderImpl(final PagingStore pagingStore,
|
||||
final StorageManager storageManager,
|
||||
final Executor executor,
|
||||
final ArtemisExecutor executor,
|
||||
final int maxCacheSize) {
|
||||
this.pagingStore = pagingStore;
|
||||
this.storageManager = storageManager;
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.Map.Entry;
|
|||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
@ -56,6 +55,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract
|
|||
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
|
@ -92,14 +92,14 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
|
||||
private final PageSubscriptionCounter counter;
|
||||
|
||||
private final Executor executor;
|
||||
private final ArtemisExecutor executor;
|
||||
|
||||
private final AtomicLong deliveredCount = new AtomicLong(0);
|
||||
|
||||
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
|
||||
final PagingStore pageStore,
|
||||
final StorageManager store,
|
||||
final Executor executor,
|
||||
final ArtemisExecutor executor,
|
||||
final Filter filter,
|
||||
final long cursorId,
|
||||
final boolean persistent) {
|
||||
|
@ -743,7 +743,7 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
public ArtemisExecutor getExecutor() {
|
||||
return executor;
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriv
|
|||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
/**
|
||||
* Integration point between Paging and JDBC
|
||||
|
@ -153,7 +154,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
public PageCursorProvider newCursorProvider(PagingStore store,
|
||||
StorageManager storageManager,
|
||||
AddressSettings addressSettings,
|
||||
Executor executor) {
|
||||
ArtemisExecutor executor) {
|
||||
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
/**
|
||||
* Integration point between Paging and NIO
|
||||
|
@ -115,7 +116,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
|||
public PageCursorProvider newCursorProvider(PagingStore store,
|
||||
StorageManager storageManager,
|
||||
AddressSettings addressSettings,
|
||||
Executor executor) {
|
||||
ArtemisExecutor executor) {
|
||||
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import java.util.List;
|
|||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -62,6 +61,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
|
|||
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
|
||||
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
|
||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
|
@ -101,7 +101,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
private final boolean usingGlobalMaxSize;
|
||||
|
||||
private final Executor executor;
|
||||
private final ArtemisExecutor executor;
|
||||
|
||||
// Bytes consumed by the queue on the memory
|
||||
private final AtomicLong sizeInBytes = new AtomicLong();
|
||||
|
@ -137,7 +137,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
final PagingStoreFactory storeFactory,
|
||||
final SimpleString storeName,
|
||||
final AddressSettings addressSettings,
|
||||
final Executor executor,
|
||||
final ArtemisExecutor executor,
|
||||
final boolean syncNonTransactional) {
|
||||
if (pagingManager == null) {
|
||||
throw new IllegalStateException("Paging Manager can't be null");
|
||||
|
|
|
@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.server.impl;
|
|||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
|
@ -29,13 +29,13 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
|
|||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
/**
|
||||
* A queue that will discard messages if a newer message with the same
|
||||
|
@ -65,7 +65,7 @@ public class LastValueQueue extends QueueImpl {
|
|||
final PostOffice postOffice,
|
||||
final StorageManager storageManager,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
final Executor executor,
|
||||
final ArtemisExecutor executor,
|
||||
final ActiveMQServer server,
|
||||
final QueueFactory factory) {
|
||||
super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
|
||||
|
|
|
@ -32,7 +32,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
@ -89,6 +88,7 @@ import org.apache.activemq.artemis.utils.Env;
|
|||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
|
||||
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
|
||||
|
@ -118,7 +118,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
public static final int MAX_DELIVERIES_IN_LOOP = 1000;
|
||||
|
||||
public static final int CHECK_QUEUE_SIZE_PERIOD = 100;
|
||||
public static final int CHECK_QUEUE_SIZE_PERIOD = 1000;
|
||||
|
||||
/**
|
||||
* If The system gets slow for any reason, this is the maximum time a Delivery or
|
||||
|
@ -228,7 +228,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
private int pos;
|
||||
|
||||
private final Executor executor;
|
||||
private final ArtemisExecutor executor;
|
||||
|
||||
private boolean internalQueue;
|
||||
|
||||
|
@ -342,7 +342,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
final PostOffice postOffice,
|
||||
final StorageManager storageManager,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
final Executor executor,
|
||||
final ArtemisExecutor executor,
|
||||
final ActiveMQServer server,
|
||||
final QueueFactory factory) {
|
||||
this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
|
||||
|
@ -361,7 +361,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
final PostOffice postOffice,
|
||||
final StorageManager storageManager,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
final Executor executor,
|
||||
final ArtemisExecutor executor,
|
||||
final ActiveMQServer server,
|
||||
final QueueFactory factory) {
|
||||
this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
|
||||
|
@ -383,7 +383,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
final PostOffice postOffice,
|
||||
final StorageManager storageManager,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
final Executor executor,
|
||||
final ArtemisExecutor executor,
|
||||
final ActiveMQServer server,
|
||||
final QueueFactory factory) {
|
||||
super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS);
|
||||
|
@ -654,19 +654,27 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
return;
|
||||
}
|
||||
|
||||
synchronized (directDeliveryGuard) {
|
||||
// The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
|
||||
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
|
||||
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
|
||||
if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
|
||||
lastDirectDeliveryCheck = System.currentTimeMillis();
|
||||
if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Checking to re-enable direct deliver on queue " + this.getName());
|
||||
}
|
||||
lastDirectDeliveryCheck = System.currentTimeMillis();
|
||||
synchronized (directDeliveryGuard) {
|
||||
// The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
|
||||
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
|
||||
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
|
||||
|
||||
if (intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) {
|
||||
if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) {
|
||||
// We must block on the executor to ensure any async deliveries have completed or we might get out of order
|
||||
// deliveries
|
||||
if (flushExecutor() && flushDeliveriesInTransit()) {
|
||||
// Go into direct delivery mode
|
||||
directDeliver = supportsDirectDeliver;
|
||||
// Go into direct delivery mode
|
||||
directDeliver = supportsDirectDeliver;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Setting direct deliverer to " + supportsDirectDeliver);
|
||||
}
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Couldn't set direct deliver back");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -773,7 +781,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
public ArtemisExecutor getExecutor() {
|
||||
if (pageSubscription != null && pageSubscription.isPaging()) {
|
||||
// When in page mode, we don't want to have concurrent IO on the same PageStore
|
||||
return pageSubscription.getExecutor();
|
||||
|
@ -791,7 +799,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public boolean flushExecutor() {
|
||||
boolean ok = internalFlushExecutor(10000);
|
||||
boolean ok = internalFlushExecutor(10000, true);
|
||||
|
||||
if (!ok) {
|
||||
ActiveMQServerLogger.LOGGER.errorFlushingExecutorsOnQueue();
|
||||
|
@ -800,14 +808,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
return ok;
|
||||
}
|
||||
|
||||
private boolean internalFlushExecutor(long timeout) {
|
||||
private boolean internalFlushExecutor(long timeout, boolean log) {
|
||||
FutureLatch future = new FutureLatch();
|
||||
|
||||
getExecutor().execute(future);
|
||||
|
||||
boolean result = future.await(timeout);
|
||||
|
||||
if (!result) {
|
||||
if (log && !result) {
|
||||
ActiveMQServerLogger.LOGGER.queueBusy(this.name.toString(), timeout);
|
||||
}
|
||||
return result;
|
||||
|
@ -2344,7 +2352,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
}
|
||||
|
||||
private void internalAddRedistributor(final Executor executor) {
|
||||
private void internalAddRedistributor(final ArtemisExecutor executor) {
|
||||
// create the redistributor only once if there are no local consumers
|
||||
if (consumerSet.isEmpty() && redistributor == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -2745,9 +2753,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
private void proceedDeliver(Consumer consumer, MessageReference reference) {
|
||||
try {
|
||||
consumer.proceedDeliver(reference);
|
||||
deliveriesInTransit.countDown();
|
||||
} catch (Throwable t) {
|
||||
deliveriesInTransit.countDown();
|
||||
ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
|
||||
|
||||
synchronized (this) {
|
||||
|
@ -2761,6 +2767,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
// The message failed to be delivered, hence we try again
|
||||
addHead(reference, false);
|
||||
}
|
||||
} finally {
|
||||
deliveriesInTransit.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2949,9 +2957,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
private class DelayedAddRedistributor implements Runnable {
|
||||
|
||||
private final Executor executor1;
|
||||
private final ArtemisExecutor executor1;
|
||||
|
||||
DelayedAddRedistributor(final Executor executor) {
|
||||
DelayedAddRedistributor(final ArtemisExecutor executor) {
|
||||
this.executor1 = executor;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,17 +16,15 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
|
@ -74,6 +72,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -236,7 +235,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
|
|||
final PostOffice postOffice,
|
||||
final StorageManager storageManager,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
final Executor executor, final ActiveMQServer server) {
|
||||
final ArtemisExecutor executor, final ActiveMQServer server) {
|
||||
super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode,
|
||||
maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager,
|
||||
addressSettingsRepository, executor, server, null);
|
||||
|
|
|
@ -16,19 +16,17 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
|
@ -64,6 +62,7 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
|||
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -518,7 +517,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
|
|||
PostOffice postOffice,
|
||||
StorageManager storageManager,
|
||||
HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
Executor executor) {
|
||||
ArtemisExecutor executor) {
|
||||
super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor,
|
||||
postOffice, storageManager, addressSettingsRepository, executor, null, null);
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -86,6 +85,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl
|
|||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -3056,7 +3056,7 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
|
||||
InterruptedCursorProvider(PagingStore pagingStore,
|
||||
StorageManager storageManager,
|
||||
Executor executor,
|
||||
ArtemisExecutor executor,
|
||||
int maxCacheSize) {
|
||||
super(pagingStore, storageManager, executor, maxCacheSize);
|
||||
}
|
||||
|
@ -3082,7 +3082,7 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
public PageCursorProvider newCursorProvider(PagingStore store,
|
||||
StorageManager storageManager,
|
||||
AddressSettings addressSettings,
|
||||
Executor executor) {
|
||||
ArtemisExecutor executor) {
|
||||
return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
}
|
||||
};
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
|||
import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakeConsumer;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -70,7 +71,7 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
QueueImpl queue =
|
||||
new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
|
||||
false, scheduledExecutor, null, null, null,
|
||||
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null);
|
||||
ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null);
|
||||
|
||||
// Send one scheduled
|
||||
|
||||
|
@ -135,7 +136,8 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testScheduled() throws Exception {
|
||||
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null);
|
||||
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null,
|
||||
ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null);
|
||||
|
||||
FakeConsumer consumer = null;
|
||||
|
||||
|
@ -233,7 +235,8 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
public void disconnect() {
|
||||
}
|
||||
};
|
||||
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null);
|
||||
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null,
|
||||
ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null);
|
||||
MessageReference messageReference = generateReference(queue, 1);
|
||||
queue.addConsumer(consumer);
|
||||
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.List;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -67,6 +66,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -743,8 +743,8 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
return new ExecutorFactory() {
|
||||
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return executor;
|
||||
public ArtemisExecutor getExecutor() {
|
||||
return ArtemisExecutor.delegate(executor);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -818,7 +818,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
|||
public PageCursorProvider newCursorProvider(PagingStore store,
|
||||
StorageManager storageManager,
|
||||
AddressSettings addressSettings,
|
||||
Executor executor) {
|
||||
ArtemisExecutor executor) {
|
||||
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOff
|
|||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -1310,6 +1311,6 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
|
||||
private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) {
|
||||
return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor,
|
||||
new FakePostOffice(), null, null, executor, null, null);
|
||||
new FakePostOffice(), null, null, ArtemisExecutor.delegate(executor), null, null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.QueueConfig;
|
|||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
public final class FakeQueueFactory implements QueueFactory {
|
||||
|
||||
|
@ -42,7 +43,7 @@ public final class FakeQueueFactory implements QueueFactory {
|
|||
public Queue createQueueWith(final QueueConfig config) {
|
||||
return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(),
|
||||
config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
|
||||
scheduledExecutor, postOffice, null, null, executor, null, this);
|
||||
scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
|
@ -57,7 +58,7 @@ public final class FakeQueueFactory implements QueueFactory {
|
|||
final boolean temporary,
|
||||
final boolean autoCreated) {
|
||||
return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated,
|
||||
scheduledExecutor, postOffice, null, null, executor, null, this);
|
||||
scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue