From 7fd17f407f00d898f5c77b4ea0527f095855485f Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 10 Jul 2017 18:03:55 -0400 Subject: [PATCH] ARTEMIS-1269 Simple Actor to replace certain executions This is replacing an executor on ServerSessionPacketHandler by a this actor. This is to avoid creating a new runnable per packet received. Instead of creating new Runnable, this will use a single static runnable and the packet will be send by a message, which will be treated by a listener. Look at ServerSessionPacketHandler on this commit for more information on how it works. --- .../commands/tools/xml/XmlDataExporter.java | 2 +- .../artemis/utils/OrderedExecutorFactory.java | 145 ------------------ .../activemq/artemis/utils/actors/Actor.java | 40 +++++ .../artemis/utils/actors/ActorListener.java | 22 +++ .../artemis/utils/actors/OrderedExecutor.java | 62 ++++++++ .../utils/actors/OrderedExecutorFactory.java | 69 +++++++++ .../artemis/utils/actors/ProcessorBase.java | 116 ++++++++++++++ .../client/impl/ClientSessionFactoryImpl.java | 2 +- .../core/journal/impl/JournalImpl.java | 2 +- .../core/ServerSessionPacketHandler.java | 22 ++- .../core/impl/ActiveMQPacketHandler.java | 2 +- .../core/remoting/impl/invm/InVMAcceptor.java | 2 +- .../remoting/impl/invm/InVMConnector.java | 2 +- .../core/replication/ReplicationEndpoint.java | 2 +- .../artemis/core/server/ActiveMQServer.java | 3 + .../core/server/impl/ActiveMQServerImpl.java | 3 +- .../server/files/FileMoveManagerTest.java | 2 +- .../artemis/tests/util/ActiveMQTestBase.java | 2 +- .../journal/NIOJournalCompactTest.java | 2 +- .../persistence/JournalFileSizeTest.java | 2 +- .../replication/ReplicationTest.java | 2 +- .../JournalCleanupCompactStressTest.java | 2 +- .../impl/DuplicateDetectionUnitTest.java | 2 +- 23 files changed, 345 insertions(+), 165 deletions(-) delete mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/Actor.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ActorListener.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java index 607f92beb5..f54c3d4859 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java @@ -82,7 +82,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.") public final class XmlDataExporter extends OptionalLocking { diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java deleted file mode 100644 index 65cb08f5b9..0000000000 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.utils; - -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; -import org.jboss.logging.Logger; - -/** - * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance. - */ -public final class OrderedExecutorFactory implements ExecutorFactory { - - private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class); - - private final Executor parent; - - - public static boolean flushExecutor(Executor executor) { - return flushExecutor(executor, 30, TimeUnit.SECONDS); - } - - public static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit) { - final CountDownLatch latch = new CountDownLatch(1); - executor.execute(latch::countDown); - try { - return latch.await(timeout, unit); - } catch (Exception e) { - return false; - } - } - - /** - * Construct a new instance delegating to the given parent executor. - * - * @param parent the parent executor - */ - public OrderedExecutorFactory(final Executor parent) { - this.parent = parent; - } - - /** - * Get an executor that always executes tasks in order. - * - * @return an ordered executor - */ - @Override - public Executor getExecutor() { - return new OrderedExecutor(parent); - } - - /** - * An executor that always runs all tasks in order, using a delegate executor to run the tasks. - *
- * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the - * same method, will result in B's task running after A's. - */ - private static class OrderedExecutor implements Executor { - - private final Queue tasks = new ConcurrentLinkedQueue<>(); - private final Executor delegate; - private final ExecutorTask task = new ExecutorTask(); - - // used by stateUpdater - @SuppressWarnings("unused") - private volatile int state = 0; - - private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state"); - - private static final int STATE_NOT_RUNNING = 0; - private static final int STATE_RUNNING = 1; - - private OrderedExecutor(Executor delegate) { - this.delegate = delegate; - } - - @Override - public void execute(Runnable command) { - tasks.add(command); - if (stateUpdater.get(this) == STATE_NOT_RUNNING) { - //note that this can result in multiple tasks being queued - //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored - delegate.execute(task); - } - } - - private final class ExecutorTask implements Runnable { - - @Override - public void run() { - do { - //if there is no thread active then we run - if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) { - Runnable task = tasks.poll(); - //while the queue is not empty we process in order - while (task != null) { - try { - task.run(); - } catch (ActiveMQInterruptedException e) { - // This could happen during shutdowns. Nothing to be concerned about here - logger.debug("Interrupted Thread", e); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } - task = tasks.poll(); - } - //set state back to not running. - stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING); - } else { - return; - } - //we loop again based on tasks not being empty. Otherwise there is a window where the state is running, - //but poll() has returned null, so a submitting thread will believe that it does not need re-execute. - //this check fixes the issue - } - while (!tasks.isEmpty()); - } - } - - @Override - public String toString() { - return "OrderedExecutor(tasks=" + tasks + ")"; - } - } -} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/Actor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/Actor.java new file mode 100644 index 0000000000..19537b3563 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/Actor.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.Executor; + +public class Actor extends ProcessorBase { + + private final ActorListener listener; + + public Actor(Executor parent, ActorListener listener) { + super(parent); + this.listener = listener; + } + + @Override + protected final void doTask(T task) { + listener.onMessage(task); + } + + public final void act(T message) { + task(message); + } + +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ActorListener.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ActorListener.java new file mode 100644 index 0000000000..11d1dc5b73 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ActorListener.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +public interface ActorListener { + void onMessage(T message); +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java new file mode 100644 index 0000000000..6f0ee9aa6f --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.Executor; + +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.jboss.logging.Logger; + +/** + * An executor that always runs all tasks in order, using a delegate executor to run the tasks. + *
+ * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the + * same method, will result in B's task running after A's. + */ +public class OrderedExecutor extends ProcessorBase implements Executor { + + public OrderedExecutor(Executor delegate) { + super(delegate); + } + + private static final Logger logger = Logger.getLogger(OrderedExecutor.class); + + @Override + protected final void doTask(Runnable task) { + try { + task.run(); + } catch (ActiveMQInterruptedException e) { + // This could happen during shutdowns. Nothing to be concerned about here + logger.debug("Interrupted Thread", e); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + + } + + @Override + public final void execute(Runnable run) { + task(run); + } + + @Override + public String toString() { + return "OrderedExecutor(tasks=" + tasks + ")"; + } + +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java new file mode 100644 index 0000000000..da61f3d936 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.utils.ExecutorFactory; +/** + * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance. + */ +public final class OrderedExecutorFactory implements ExecutorFactory { + + final Executor parent; + + public static boolean flushExecutor(Executor executor) { + return flushExecutor(executor, 30, TimeUnit.SECONDS); + } + + public static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit) { + final CountDownLatch latch = new CountDownLatch(1); + executor.execute(latch::countDown); + try { + return latch.await(timeout, unit); + } catch (Exception e) { + return false; + } + } + + /** + * Construct a new instance delegating to the given parent executor. + * + * @param parent the parent executor + */ + public OrderedExecutorFactory(final Executor parent) { + this.parent = parent; + } + + /** + * Get an executor that always executes tasks in order. + * + * @return an ordered executor + */ + @Override + public Executor getExecutor() { + return new OrderedExecutor(parent); + } + + /** I couldn't figure out how to make a new method to return a generic Actor with a given type */ + public Executor getParent() { + return parent; + } +} + diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java new file mode 100644 index 0000000000..07ed9e943c --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +public abstract class ProcessorBase { + + private static final int STATE_NOT_RUNNING = 0; + private static final int STATE_RUNNING = 1; + + protected final Queue tasks = new ConcurrentLinkedQueue<>(); + + private final Executor delegate; + + private final ExecutorTask task = new ExecutorTask(); + + // used by stateUpdater + @SuppressWarnings("unused") + private volatile int state = 0; + + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state"); + + private final class ExecutorTask implements Runnable { + + @Override + public void run() { + do { + //if there is no thread active then we run + if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) { + T task = tasks.poll(); + //while the queue is not empty we process in order + while (task != null) { + doTask(task); + task = tasks.poll(); + } + //set state back to not running. + stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING); + } else { + return; + } + //we loop again based on tasks not being empty. Otherwise there is a window where the state is running, + //but poll() has returned null, so a submitting thread will believe that it does not need re-execute. + //this check fixes the issue + } + while (!tasks.isEmpty()); + } + } + + protected abstract void doTask(T task); + + public ProcessorBase(Executor parent) { + this.delegate = parent; + } + + public final boolean flush() { + return flush(30, TimeUnit.SECONDS); + } + + /** + * WARNING: This will only flush when all the activity is suspended. + * don't expect success on this call if another thread keeps feeding the queue + * this is only valid on situations where you are not feeding the queue, + * like in shutdown and failover situations. + * */ + public final boolean flush(long timeout, TimeUnit unit) { + if (stateUpdater.get(this) == STATE_NOT_RUNNING) { + // quick test, most of the time it will be empty anyways + return true; + } + + long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout); + try { + while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) { + Thread.sleep(10); + } + } catch (InterruptedException e) { + // ignored + } + + return stateUpdater.get(this) == STATE_NOT_RUNNING; + } + + protected void task(T command) { + tasks.add(command); + startPoller(); + } + + protected void startPoller() { + if (stateUpdater.get(this) == STATE_NOT_RUNNING) { + //note that this can result in multiple tasks being queued + //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored + delegate.execute(task); + } + } + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 5106ed3904..cc26b7d281 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -64,7 +64,7 @@ import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.jboss.logging.Logger; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 513ed030f8..bb557e4fb1 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -73,7 +73,7 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.SimpleFuture; import org.apache.activemq.artemis.utils.SimpleFutureImpl; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 54d1d44157..0c95bed095 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -85,13 +85,15 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.Actor; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.SimpleFuture; import org.apache.activemq.artemis.utils.SimpleFutureImpl; import org.jboss.logging.Logger; @@ -145,6 +147,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { private volatile CoreRemotingConnection remotingConnection; + private final Actor packetActor; + private final Executor callExecutor; private final CoreProtocolManager manager; @@ -154,7 +158,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { private final boolean direct; - public ServerSessionPacketHandler(final Executor callExecutor, + public ServerSessionPacketHandler(final ActiveMQServer server, final CoreProtocolManager manager, final ServerSession session, final StorageManager storageManager, @@ -173,7 +177,10 @@ public class ServerSessionPacketHandler implements ChannelHandler { Connection conn = remotingConnection.getTransportConnection(); - this.callExecutor = callExecutor; + this.callExecutor = server.getExecutorFactory().getExecutor(); + + // TODO: I wish I could figure out how to create this through OrderedExecutor + this.packetActor = new Actor<>(server.getThreadPool(), this::onMessagePacket); if (conn instanceof NettyConnection) { direct = ((NettyConnection) conn).isDirectDeliver(); @@ -214,6 +221,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { } private void flushExecutor() { + packetActor.flush(); OrderedExecutorFactory.flushExecutor(callExecutor); } @@ -236,10 +244,14 @@ public class ServerSessionPacketHandler implements ChannelHandler { @Override public void handlePacket(final Packet packet) { channel.confirm(packet); - callExecutor.execute(() -> internalHandlePacket(packet)); + + // This method will call onMessagePacket through an actor + packetActor.act(packet); } - private void internalHandlePacket(final Packet packet) { + + // this method is used as a listener on the packetActor + private void onMessagePacket(final Packet packet) { byte type = packet.getType(); storageManager.setContext(session.getSessionContext()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 765d6fa458..c9cc926b1a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -169,7 +169,7 @@ public class ActiveMQPacketHandler implements ChannelHandler { ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap); - ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server.getExecutorFactory().getExecutor(), protocolManager, session, server.getStorageManager(), channel); + ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, protocolManager, session, server.getStorageManager(), channel); channel.setHandler(handler); // TODO - where is this removed? diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java index 5cae1e51f0..8c2bee524c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java @@ -38,7 +38,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java index d600a7bc77..d3ac0fdfeb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java @@ -42,7 +42,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.ConfigurationHelper; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.jboss.logging.Logger; public class InVMConnector extends AbstractConnector { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 6683fbeed0..fed4805bcd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -79,7 +79,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.jboss.logging.Logger; /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 5d4aaeb8d8..78ce8a2a5c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -421,6 +422,8 @@ public interface ActiveMQServer extends ServiceComponent { void removeClientConnection(String clientId); + Executor getThreadPool(); + AddressInfo getAddressInfo(SimpleString address); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index dec338be0d..fe523077ae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -164,7 +164,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.SecurityFormatter; import org.apache.activemq.artemis.utils.TimeUtils; @@ -649,6 +649,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { externalComponents.add(externalComponent); } + @Override public ExecutorService getThreadPool() { return threadPool; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java index d53cdd9151..f47c827571 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java @@ -39,7 +39,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.junit.Assert; import org.junit.Before; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index a95f77a256..736ded1de6 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -138,7 +138,7 @@ import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.FileUtil; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.UUIDGenerator; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java index f873ffd5ef..c830459abd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java @@ -52,7 +52,7 @@ import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEnco import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.jboss.logging.Logger; import org.junit.After; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/JournalFileSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/JournalFileSizeTest.java index 7803479293..749715b7c5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/JournalFileSizeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/JournalFileSizeTest.java @@ -21,7 +21,7 @@ import java.io.File; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.junit.Assert; import org.junit.Test; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index d67b980dee..3095ab9788 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -86,7 +86,7 @@ import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils; import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java index 7493949999..34198d1a2a 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java @@ -44,7 +44,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.junit.After; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java index 58c5c4f45a..21bc48dc97 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java @@ -40,7 +40,7 @@ import org.apache.activemq.artemis.tests.unit.util.FakePagingManager; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.After; import org.junit.Assert;