This commit is contained in:
Clebert Suconic 2018-02-26 17:25:45 -05:00
commit 40e8daf819
6 changed files with 27 additions and 23 deletions

View File

@ -50,6 +50,23 @@ public interface ArtemisExecutor extends Executor {
return 0;
}
/** To be used to flush an executor from a different thread.
* WARNING: Do not call this within the executor. That would be stoopid ;)
*
* @param timeout
* @param unit
* @return
*/
default boolean flush(long timeout, TimeUnit unit) {
CountDownLatch latch = new CountDownLatch(1);
execute(latch::countDown);
try {
return latch.await(timeout, unit);
} catch (Exception e) {
return false;
}
}
/**
* It will wait the current execution (if there is one) to finish
* but will not complete any further executions

View File

@ -26,7 +26,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class ActiveMQConsumerResourceTest {
@ -58,13 +57,7 @@ public class ActiveMQConsumerResourceTest {
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getMessageCount(TEST_QUEUE) == 1;
}
}, 5000, 100);
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_QUEUE), 1, server.getMessageCount(TEST_QUEUE));
Wait.assertEquals(1, () -> server.getMessageCount(TEST_QUEUE));
ClientMessage received = consumer.receiveMessage();
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);

View File

@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.filter.Filter;
@ -38,7 +39,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.SoftValueHashMap;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger;
@ -244,12 +244,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
private void waitForFuture() {
FutureLatch future = new FutureLatch();
if (!executor.flush(10, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.timedOutStoppingPagingCursor(executor);
executor.execute(future);
while (!future.await(10000)) {
ActiveMQServerLogger.LOGGER.timedOutStoppingPagingCursor(future, executor);
}
}

View File

@ -28,6 +28,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -54,7 +55,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
@ -703,9 +703,7 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override
public void flushExecutors() {
FutureLatch future = new FutureLatch();
executor.execute(future);
while (!future.await(1000)) {
if (!executor.flush(10, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.timedOutFlushingExecutorsPagingCursor(this);
}
}

View File

@ -61,7 +61,6 @@ import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImp
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.Cause;
@ -512,8 +511,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void problemUndeployingNode(@Cause Exception e, Node node);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222022, value = "Timed out waiting for paging cursor to stop {0} {1}", format = Message.Format.MESSAGE_FORMAT)
void timedOutStoppingPagingCursor(FutureLatch future, Executor executor);
@Message(id = 222022, value = "Timed out waiting for paging cursor to stop {0}", format = Message.Format.MESSAGE_FORMAT)
void timedOutStoppingPagingCursor(Executor executor);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222023, value = "problem cleaning page address {0}", format = Message.Format.MESSAGE_FORMAT)

View File

@ -2121,9 +2121,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, false, shutdownOnCriticalIO);
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO);
}
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
}
/**