ARTEMIS-1269 Simple Actor to replace certain executions
This is replacing an executor on ServerSessionPacketHandler by a this actor. This is to avoid creating a new runnable per packet received. Instead of creating new Runnable, this will use a single static runnable and the packet will be send by a message, which will be treated by a listener. Look at ServerSessionPacketHandler on this commit for more information on how it works.
This commit is contained in:
parent
82f071ff46
commit
7fd17f407f
|
@ -82,7 +82,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
|
|
||||||
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
|
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
|
||||||
public final class XmlDataExporter extends OptionalLocking {
|
public final class XmlDataExporter extends OptionalLocking {
|
||||||
|
|
|
@ -1,145 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.utils;
|
|
||||||
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
|
||||||
import org.jboss.logging.Logger;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
|
|
||||||
*/
|
|
||||||
public final class OrderedExecutorFactory implements ExecutorFactory {
|
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class);
|
|
||||||
|
|
||||||
private final Executor parent;
|
|
||||||
|
|
||||||
|
|
||||||
public static boolean flushExecutor(Executor executor) {
|
|
||||||
return flushExecutor(executor, 30, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit) {
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
executor.execute(latch::countDown);
|
|
||||||
try {
|
|
||||||
return latch.await(timeout, unit);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Construct a new instance delegating to the given parent executor.
|
|
||||||
*
|
|
||||||
* @param parent the parent executor
|
|
||||||
*/
|
|
||||||
public OrderedExecutorFactory(final Executor parent) {
|
|
||||||
this.parent = parent;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get an executor that always executes tasks in order.
|
|
||||||
*
|
|
||||||
* @return an ordered executor
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public Executor getExecutor() {
|
|
||||||
return new OrderedExecutor(parent);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An executor that always runs all tasks in order, using a delegate executor to run the tasks.
|
|
||||||
* <br>
|
|
||||||
* More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
|
|
||||||
* same method, will result in B's task running after A's.
|
|
||||||
*/
|
|
||||||
private static class OrderedExecutor implements Executor {
|
|
||||||
|
|
||||||
private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
|
|
||||||
private final Executor delegate;
|
|
||||||
private final ExecutorTask task = new ExecutorTask();
|
|
||||||
|
|
||||||
// used by stateUpdater
|
|
||||||
@SuppressWarnings("unused")
|
|
||||||
private volatile int state = 0;
|
|
||||||
|
|
||||||
private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state");
|
|
||||||
|
|
||||||
private static final int STATE_NOT_RUNNING = 0;
|
|
||||||
private static final int STATE_RUNNING = 1;
|
|
||||||
|
|
||||||
private OrderedExecutor(Executor delegate) {
|
|
||||||
this.delegate = delegate;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void execute(Runnable command) {
|
|
||||||
tasks.add(command);
|
|
||||||
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
|
|
||||||
//note that this can result in multiple tasks being queued
|
|
||||||
//this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
|
|
||||||
delegate.execute(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final class ExecutorTask implements Runnable {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
do {
|
|
||||||
//if there is no thread active then we run
|
|
||||||
if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
|
|
||||||
Runnable task = tasks.poll();
|
|
||||||
//while the queue is not empty we process in order
|
|
||||||
while (task != null) {
|
|
||||||
try {
|
|
||||||
task.run();
|
|
||||||
} catch (ActiveMQInterruptedException e) {
|
|
||||||
// This could happen during shutdowns. Nothing to be concerned about here
|
|
||||||
logger.debug("Interrupted Thread", e);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.warn(t.getMessage(), t);
|
|
||||||
}
|
|
||||||
task = tasks.poll();
|
|
||||||
}
|
|
||||||
//set state back to not running.
|
|
||||||
stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING);
|
|
||||||
} else {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
//we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
|
|
||||||
//but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
|
|
||||||
//this check fixes the issue
|
|
||||||
}
|
|
||||||
while (!tasks.isEmpty());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "OrderedExecutor(tasks=" + tasks + ")";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.utils.actors;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
public class Actor<T> extends ProcessorBase<T> {
|
||||||
|
|
||||||
|
private final ActorListener<T> listener;
|
||||||
|
|
||||||
|
public Actor(Executor parent, ActorListener<T> listener) {
|
||||||
|
super(parent);
|
||||||
|
this.listener = listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final void doTask(T task) {
|
||||||
|
listener.onMessage(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
public final void act(T message) {
|
||||||
|
task(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.utils.actors;
|
||||||
|
|
||||||
|
public interface ActorListener<T> {
|
||||||
|
void onMessage(T message);
|
||||||
|
}
|
|
@ -0,0 +1,62 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.utils.actors;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An executor that always runs all tasks in order, using a delegate executor to run the tasks.
|
||||||
|
* <br>
|
||||||
|
* More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
|
||||||
|
* same method, will result in B's task running after A's.
|
||||||
|
*/
|
||||||
|
public class OrderedExecutor extends ProcessorBase<Runnable> implements Executor {
|
||||||
|
|
||||||
|
public OrderedExecutor(Executor delegate) {
|
||||||
|
super(delegate);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(OrderedExecutor.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final void doTask(Runnable task) {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
} catch (ActiveMQInterruptedException e) {
|
||||||
|
// This could happen during shutdowns. Nothing to be concerned about here
|
||||||
|
logger.debug("Interrupted Thread", e);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn(t.getMessage(), t);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void execute(Runnable run) {
|
||||||
|
task(run);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "OrderedExecutor(tasks=" + tasks + ")";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.utils.actors;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
|
/**
|
||||||
|
* A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
|
||||||
|
*/
|
||||||
|
public final class OrderedExecutorFactory implements ExecutorFactory {
|
||||||
|
|
||||||
|
final Executor parent;
|
||||||
|
|
||||||
|
public static boolean flushExecutor(Executor executor) {
|
||||||
|
return flushExecutor(executor, 30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit) {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
executor.execute(latch::countDown);
|
||||||
|
try {
|
||||||
|
return latch.await(timeout, unit);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a new instance delegating to the given parent executor.
|
||||||
|
*
|
||||||
|
* @param parent the parent executor
|
||||||
|
*/
|
||||||
|
public OrderedExecutorFactory(final Executor parent) {
|
||||||
|
this.parent = parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an executor that always executes tasks in order.
|
||||||
|
*
|
||||||
|
* @return an ordered executor
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Executor getExecutor() {
|
||||||
|
return new OrderedExecutor(parent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** I couldn't figure out how to make a new method to return a generic Actor with a given type */
|
||||||
|
public Executor getParent() {
|
||||||
|
return parent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,116 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.utils.actors;
|
||||||
|
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||||
|
|
||||||
|
public abstract class ProcessorBase<T> {
|
||||||
|
|
||||||
|
private static final int STATE_NOT_RUNNING = 0;
|
||||||
|
private static final int STATE_RUNNING = 1;
|
||||||
|
|
||||||
|
protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
|
private final Executor delegate;
|
||||||
|
|
||||||
|
private final ExecutorTask task = new ExecutorTask();
|
||||||
|
|
||||||
|
// used by stateUpdater
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
private volatile int state = 0;
|
||||||
|
|
||||||
|
private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
|
||||||
|
|
||||||
|
private final class ExecutorTask implements Runnable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
do {
|
||||||
|
//if there is no thread active then we run
|
||||||
|
if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
|
||||||
|
T task = tasks.poll();
|
||||||
|
//while the queue is not empty we process in order
|
||||||
|
while (task != null) {
|
||||||
|
doTask(task);
|
||||||
|
task = tasks.poll();
|
||||||
|
}
|
||||||
|
//set state back to not running.
|
||||||
|
stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
|
||||||
|
//but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
|
||||||
|
//this check fixes the issue
|
||||||
|
}
|
||||||
|
while (!tasks.isEmpty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void doTask(T task);
|
||||||
|
|
||||||
|
public ProcessorBase(Executor parent) {
|
||||||
|
this.delegate = parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean flush() {
|
||||||
|
return flush(30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* WARNING: This will only flush when all the activity is suspended.
|
||||||
|
* don't expect success on this call if another thread keeps feeding the queue
|
||||||
|
* this is only valid on situations where you are not feeding the queue,
|
||||||
|
* like in shutdown and failover situations.
|
||||||
|
* */
|
||||||
|
public final boolean flush(long timeout, TimeUnit unit) {
|
||||||
|
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
|
||||||
|
// quick test, most of the time it will be empty anyways
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
|
||||||
|
try {
|
||||||
|
while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignored
|
||||||
|
}
|
||||||
|
|
||||||
|
return stateUpdater.get(this) == STATE_NOT_RUNNING;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void task(T command) {
|
||||||
|
tasks.add(command);
|
||||||
|
startPoller();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startPoller() {
|
||||||
|
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
|
||||||
|
//note that this can result in multiple tasks being queued
|
||||||
|
//this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
|
||||||
|
delegate.execute(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -64,7 +64,7 @@ import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
|
||||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||||
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
|
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
|
@ -73,7 +73,7 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.DataConstants;
|
import org.apache.activemq.artemis.utils.DataConstants;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.SimpleFuture;
|
import org.apache.activemq.artemis.utils.SimpleFuture;
|
||||||
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
|
|
|
@ -85,13 +85,15 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
||||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.Actor;
|
||||||
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.SimpleFuture;
|
import org.apache.activemq.artemis.utils.SimpleFuture;
|
||||||
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -145,6 +147,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
private volatile CoreRemotingConnection remotingConnection;
|
private volatile CoreRemotingConnection remotingConnection;
|
||||||
|
|
||||||
|
private final Actor<Packet> packetActor;
|
||||||
|
|
||||||
private final Executor callExecutor;
|
private final Executor callExecutor;
|
||||||
|
|
||||||
private final CoreProtocolManager manager;
|
private final CoreProtocolManager manager;
|
||||||
|
@ -154,7 +158,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
private final boolean direct;
|
private final boolean direct;
|
||||||
|
|
||||||
public ServerSessionPacketHandler(final Executor callExecutor,
|
public ServerSessionPacketHandler(final ActiveMQServer server,
|
||||||
final CoreProtocolManager manager,
|
final CoreProtocolManager manager,
|
||||||
final ServerSession session,
|
final ServerSession session,
|
||||||
final StorageManager storageManager,
|
final StorageManager storageManager,
|
||||||
|
@ -173,7 +177,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
Connection conn = remotingConnection.getTransportConnection();
|
Connection conn = remotingConnection.getTransportConnection();
|
||||||
|
|
||||||
this.callExecutor = callExecutor;
|
this.callExecutor = server.getExecutorFactory().getExecutor();
|
||||||
|
|
||||||
|
// TODO: I wish I could figure out how to create this through OrderedExecutor
|
||||||
|
this.packetActor = new Actor<>(server.getThreadPool(), this::onMessagePacket);
|
||||||
|
|
||||||
if (conn instanceof NettyConnection) {
|
if (conn instanceof NettyConnection) {
|
||||||
direct = ((NettyConnection) conn).isDirectDeliver();
|
direct = ((NettyConnection) conn).isDirectDeliver();
|
||||||
|
@ -214,6 +221,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void flushExecutor() {
|
private void flushExecutor() {
|
||||||
|
packetActor.flush();
|
||||||
OrderedExecutorFactory.flushExecutor(callExecutor);
|
OrderedExecutorFactory.flushExecutor(callExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,10 +244,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
@Override
|
@Override
|
||||||
public void handlePacket(final Packet packet) {
|
public void handlePacket(final Packet packet) {
|
||||||
channel.confirm(packet);
|
channel.confirm(packet);
|
||||||
callExecutor.execute(() -> internalHandlePacket(packet));
|
|
||||||
|
// This method will call onMessagePacket through an actor
|
||||||
|
packetActor.act(packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void internalHandlePacket(final Packet packet) {
|
|
||||||
|
// this method is used as a listener on the packetActor
|
||||||
|
private void onMessagePacket(final Packet packet) {
|
||||||
byte type = packet.getType();
|
byte type = packet.getType();
|
||||||
|
|
||||||
storageManager.setContext(session.getSessionContext());
|
storageManager.setContext(session.getSessionContext());
|
||||||
|
|
|
@ -169,7 +169,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
|
||||||
|
|
||||||
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
|
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap);
|
||||||
|
|
||||||
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server.getExecutorFactory().getExecutor(), protocolManager, session, server.getStorageManager(), channel);
|
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, protocolManager, session, server.getStorageManager(), channel);
|
||||||
channel.setHandler(handler);
|
channel.setHandler(handler);
|
||||||
|
|
||||||
// TODO - where is this removed?
|
// TODO - where is this removed?
|
||||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
|
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
|
||||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
|
||||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
public class InVMConnector extends AbstractConnector {
|
public class InVMConnector extends AbstractConnector {
|
||||||
|
|
|
@ -79,7 +79,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
|
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -421,6 +422,8 @@ public interface ActiveMQServer extends ServiceComponent {
|
||||||
|
|
||||||
void removeClientConnection(String clientId);
|
void removeClientConnection(String clientId);
|
||||||
|
|
||||||
|
Executor getThreadPool();
|
||||||
|
|
||||||
AddressInfo getAddressInfo(SimpleString address);
|
AddressInfo getAddressInfo(SimpleString address);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -164,7 +164,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
|
||||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
import org.apache.activemq.artemis.utils.SecurityFormatter;
|
import org.apache.activemq.artemis.utils.SecurityFormatter;
|
||||||
import org.apache.activemq.artemis.utils.TimeUtils;
|
import org.apache.activemq.artemis.utils.TimeUtils;
|
||||||
|
@ -649,6 +649,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
externalComponents.add(externalComponent);
|
externalComponents.add(externalComponent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ExecutorService getThreadPool() {
|
public ExecutorService getThreadPool() {
|
||||||
return threadPool;
|
return threadPool;
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
||||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
|
@ -138,7 +138,7 @@ import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.Env;
|
import org.apache.activemq.artemis.utils.Env;
|
||||||
import org.apache.activemq.artemis.utils.FileUtil;
|
import org.apache.activemq.artemis.utils.FileUtil;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
|
|
|
@ -52,7 +52,7 @@ import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEnco
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
|
@ -21,7 +21,7 @@ import java.io.File;
|
||||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
|
@ -86,7 +86,7 @@ import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
|
||||||
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
|
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContex
|
||||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
Loading…
Reference in New Issue