ARTEMIS-1495 Removing flushes from codebase
Instead of flushing we just need to make sure there are no more calls into page executors as we stop the PageManager. This will avoid any possible starvations or deadlocks here.
This commit is contained in:
parent
8bf879f156
commit
2e6176a693
|
@ -25,7 +25,7 @@ public interface ArtemisExecutor extends Executor {
|
|||
|
||||
/**
|
||||
* Artemis is supposed to implement this properly, however in tests or tools
|
||||
* this can be used as a fake, doing a sipmle delegate and using the default methods implemented here.
|
||||
* this can be used as a fake, doing a simple delegate and using the default methods implemented here.
|
||||
* @param executor
|
||||
* @return
|
||||
*/
|
||||
|
@ -38,11 +38,16 @@ public interface ArtemisExecutor extends Executor {
|
|||
};
|
||||
}
|
||||
|
||||
default boolean flush() {
|
||||
return flush(30, TimeUnit.SECONDS);
|
||||
/** It will wait the current execution (if there is one) to finish
|
||||
* but will not complete any further executions */
|
||||
default void shutdownNow() {
|
||||
}
|
||||
|
||||
default boolean flush(long timeout, TimeUnit unit) {
|
||||
/**
|
||||
* This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
|
||||
* @return
|
||||
*/
|
||||
default boolean isFlushed() {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
|
@ -52,18 +57,10 @@ public interface ArtemisExecutor extends Executor {
|
|||
};
|
||||
execute(runnable);
|
||||
try {
|
||||
return latch.await(timeout, unit);
|
||||
return latch.await(100, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
|
||||
* @return
|
||||
*/
|
||||
default boolean isFlushed() {
|
||||
return flush(100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.activemq.artemis.utils.actors;
|
|||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
|
||||
public abstract class ProcessorBase<T> {
|
||||
|
@ -34,6 +33,9 @@ public abstract class ProcessorBase<T> {
|
|||
|
||||
private final ExecutorTask task = new ExecutorTask();
|
||||
|
||||
private final Object startedGuard = new Object();
|
||||
private volatile boolean started = true;
|
||||
|
||||
// used by stateUpdater
|
||||
@SuppressWarnings("unused")
|
||||
private volatile int state = 0;
|
||||
|
@ -49,8 +51,18 @@ public abstract class ProcessorBase<T> {
|
|||
if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
|
||||
T task = tasks.poll();
|
||||
//while the queue is not empty we process in order
|
||||
while (task != null) {
|
||||
doTask(task);
|
||||
|
||||
// All we care on started, is that a current task is not running as we call shutdown.
|
||||
// for that reason this first run doesn't need to be under any lock
|
||||
while (task != null && started) {
|
||||
|
||||
// Synchronized here is just to guarantee that a current task is finished before
|
||||
// the started update can be taken as false
|
||||
synchronized (startedGuard) {
|
||||
if (started) {
|
||||
doTask(task);
|
||||
}
|
||||
}
|
||||
task = tasks.poll();
|
||||
}
|
||||
//set state back to not running.
|
||||
|
@ -66,52 +78,32 @@ public abstract class ProcessorBase<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/** It will wait the current execution (if there is one) to finish
|
||||
* but will not complete any further executions */
|
||||
public void shutdownNow() {
|
||||
synchronized (startedGuard) {
|
||||
started = false;
|
||||
}
|
||||
tasks.clear();
|
||||
}
|
||||
|
||||
protected abstract void doTask(T task);
|
||||
|
||||
public ProcessorBase(Executor parent) {
|
||||
this.delegate = parent;
|
||||
}
|
||||
|
||||
public final boolean flush() {
|
||||
return flush(30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* WARNING: This will only flush when all the activity is suspended.
|
||||
* don't expect success on this call if another thread keeps feeding the queue
|
||||
* this is only valid on situations where you are not feeding the queue,
|
||||
* like in shutdown and failover situations.
|
||||
* */
|
||||
public final boolean flush(long timeout, TimeUnit unit) {
|
||||
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
|
||||
// quick test, most of the time it will be empty anyways
|
||||
return true;
|
||||
}
|
||||
|
||||
long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
|
||||
try {
|
||||
while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
|
||||
|
||||
if (tasks.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// ignored
|
||||
}
|
||||
|
||||
return stateUpdater.get(this) == STATE_NOT_RUNNING;
|
||||
}
|
||||
|
||||
public final boolean isFlushed() {
|
||||
return stateUpdater.get(this) == STATE_NOT_RUNNING;
|
||||
}
|
||||
|
||||
protected void task(T command) {
|
||||
tasks.add(command);
|
||||
startPoller();
|
||||
// There is no need to verify the lock here.
|
||||
// you can only turn of running once
|
||||
if (started) {
|
||||
tasks.add(command);
|
||||
startPoller();
|
||||
}
|
||||
}
|
||||
|
||||
protected void startPoller() {
|
||||
|
|
|
@ -240,7 +240,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
cursor.stop();
|
||||
}
|
||||
|
||||
waitForFuture();
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
private void waitForFuture() {
|
||||
|
|
|
@ -352,7 +352,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
running = false;
|
||||
|
||||
flushExecutors();
|
||||
executor.shutdownNow();
|
||||
|
||||
if (currentPage != null) {
|
||||
currentPage.close(false);
|
||||
|
|
|
@ -220,7 +220,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
public void connectionFailed(final ActiveMQException exception, boolean failedOver) {
|
||||
ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
|
||||
|
||||
flushExecutor();
|
||||
closeExecutors();
|
||||
|
||||
try {
|
||||
session.close(true);
|
||||
|
@ -248,15 +248,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
inHandler.set(null);
|
||||
}
|
||||
|
||||
public void flushExecutor() {
|
||||
if (!inHandler()) {
|
||||
packetActor.flush();
|
||||
callExecutor.flush();
|
||||
}
|
||||
public void closeExecutors() {
|
||||
packetActor.shutdownNow();
|
||||
callExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
public void close() {
|
||||
flushExecutor();
|
||||
closeExecutors();
|
||||
|
||||
channel.flushConfirmations();
|
||||
|
||||
|
@ -895,8 +893,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
remotingConnection.removeFailureListener((FailureListener) closeListener);
|
||||
}
|
||||
}
|
||||
|
||||
flushExecutor();
|
||||
}
|
||||
|
||||
public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {
|
||||
|
|
|
@ -67,7 +67,7 @@ public final class CoreSessionCallback implements SessionCallback {
|
|||
ServerSessionPacketHandler localHandler = handler;
|
||||
if (localHandler != null) {
|
||||
// We wait any pending tasks before we make this as closed
|
||||
localHandler.flushExecutor();
|
||||
localHandler.closeExecutors();
|
||||
}
|
||||
this.handler = null;
|
||||
}
|
||||
|
|
|
@ -257,7 +257,9 @@ public class ManagementServiceImpl implements ManagementService {
|
|||
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);
|
||||
unregisterFromJMX(objectName);
|
||||
unregisterFromRegistry(ResourceNames.QUEUE + name);
|
||||
messageCounterManager.unregisterMessageCounter(name.toString());
|
||||
if (messageCounterManager != null) {
|
||||
messageCounterManager.unregisterMessageCounter(name.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
|||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -303,8 +304,9 @@ public class JmsConsumerTest extends JMSTestBase {
|
|||
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
|
||||
conn.close();
|
||||
|
||||
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
|
||||
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
|
||||
Queue queue = server.locateQueue(queueName);
|
||||
Wait.assertEquals(0, queue::getDeliveringCount);
|
||||
Wait.assertEquals(0, queue::getMessageCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -329,8 +331,9 @@ public class JmsConsumerTest extends JMSTestBase {
|
|||
|
||||
// Messages should all have been acked since we set pre ack on the cf
|
||||
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
|
||||
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
|
||||
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
|
||||
Queue queue = server.locateQueue(queueName);
|
||||
Wait.assertEquals(0, queue::getDeliveringCount);
|
||||
Wait.assertEquals(0, queue::getMessageCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue