diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 3f2771af5b..e41f0dd63d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -101,6 +101,11 @@ public class NullStorageManager implements StorageManager { }); } + public NullStorageManager(int nextId) { + this(); + this.setNextId(nextId); + } + @Override public void criticalError(Throwable error) { ioCriticalErrorListener.onIOException(error, error.getMessage(), null); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index 4c368be6b9..67e830d9da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -190,8 +190,7 @@ public interface PostOffice extends ActiveMQComponent { MessageReference reload(Message message, Queue queue, Transaction tx) throws Exception; Pair redistribute(Message message, - Queue originatingQueue, - Transaction tx) throws Exception; + Queue originatingQueue) throws Exception; void processRoute(Message message, RoutingContext context, boolean direct) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 2698af3f1f..a7f279fbd8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.QueueBinding; @@ -45,8 +46,8 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.Proposal; import org.apache.activemq.artemis.core.server.group.impl.Response; +import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.CompositeAddress; -import org.apache.activemq.artemis.utils.IDGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; @@ -76,7 +77,7 @@ public final class BindingsImpl implements Bindings { private final SimpleString name; - private final IDGenerator idGenerator; + private final StorageManager storageManager; private static final AtomicInteger sequenceVersion = new AtomicInteger(Integer.MIN_VALUE); @@ -85,9 +86,9 @@ public final class BindingsImpl implements Bindings { */ private final AtomicInteger version = new AtomicInteger(sequenceVersion.incrementAndGet()); - public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, IDGenerator idGenerator) { + public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, StorageManager storageManager) { this.groupingHandler = groupingHandler; - this.idGenerator = idGenerator; + this.storageManager = storageManager; this.name = name; } @@ -235,12 +236,16 @@ public final class BindingsImpl implements Bindings { // The message needs a new ID during the redistribution // We have to create the new ID only after we can guarantee it will be routed // otherwise we may leave large messages stranded in the folder - final Message copyRedistribute = message.copy(idGenerator.generateID()); + final Message copyRedistribute = message.copy(storageManager.generateID()); if (logger.isDebugEnabled()) { logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID()); } copyRedistribute.setAddress(message.getAddress()); + if (context.getTransaction() == null) { + context.setTransaction(new TransactionImpl(storageManager)); + } + bindingIndex.setIndex(nextPosition); nextBinding.route(copyRedistribute, context); return copyRedistribute; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index c926bbc8b6..6bf72f9948 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1387,12 +1387,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding */ @Override public Pair redistribute(final Message message, - final Queue originatingQueue, - final Transaction tx) throws Exception { + final Queue originatingQueue) throws Exception { Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString()); if (bindings != null && bindings.allowRedistribute()) { - RoutingContext context = new RoutingContextImpl(tx); + RoutingContext context = new RoutingContextImpl(null); // the redistributor will make a copy of the message if it can be redistributed Message redistributedMessage = bindings.redistribute(message, originatingQueue, context); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 63724226bd..ac4992e18e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.ReusableLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,23 +113,23 @@ public class Redistributor implements Consumer { return HandleStatus.NO_MATCH; } - final Transaction tx = new TransactionImpl(storageManager); - - final Pair routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx); + final Pair routingInfo = postOffice.redistribute(reference.getMessage(), queue); if (routingInfo == null) { logger.debug("postOffice.redistribute return null for message {}", reference); - tx.rollback(); return HandleStatus.BUSY; } - postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false); + RoutingContext context = routingInfo.getA(); + Message message = routingInfo.getB(); + + postOffice.processRoute(message, context, false); if (RefCountMessage.isRefTraceEnabled()) { RefCountMessage.deferredDebug(reference.getMessage(), "redistributing"); } - ackRedistribution(reference, tx); + ackRedistribution(reference, context.getTransaction()); return HandleStatus.HANDLED; } diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java index 829abe9db6..8b791146f7 100644 --- a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java +++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -31,7 +32,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl; import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Measurement; @@ -52,7 +52,7 @@ public class WildcardAddressManagerHeirarchyPerfTest { @Override public Bindings createBindings(SimpleString address) { - return new BindingsImpl(address, null, new SimpleIDGenerator(1000)); + return new BindingsImpl(address, null, new NullStorageManager(1000)); } } diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java index a8ec84ec7a..3fe51c4b4b 100644 --- a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java +++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Group; @@ -54,7 +54,7 @@ public class WildcardAddressManagerPerfTest { @Override public Bindings createBindings(SimpleString address) { - return new BindingsImpl(address, null, new SimpleIDGenerator(1000)); + return new BindingsImpl(address, null, new NullStorageManager(1000)); } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java index 624a641375..26a37c866a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -45,7 +46,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.selector.filter.Filterable; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.junit.Test; public class BindingsImplTest extends ActiveMQTestBase { @@ -55,7 +55,7 @@ public class BindingsImplTest extends ActiveMQTestBase { final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a")); fake.filter = null; // such that it wil match all messages fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND; - final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000)); + final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000)); bind.addBinding(fake); bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction())); assertEquals(1, fake.routedCount.get()); @@ -66,7 +66,7 @@ public class BindingsImplTest extends ActiveMQTestBase { final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a")); fake.filter = null; // such that it wil match all messages fake.messageLoadBalancingType = MessageLoadBalancingType.OFF; - final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000)); + final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000)); bind.addBinding(fake); bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction())); assertEquals(0, fake.routedCount.get()); @@ -77,7 +77,7 @@ public class BindingsImplTest extends ActiveMQTestBase { final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a")); fake.filter = null; // such that it wil match all messages fake.messageLoadBalancingType = MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION; - final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000)); + final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000)); bind.addBinding(fake); bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction())); assertEquals(0, fake.routedCount.get()); @@ -102,7 +102,7 @@ public class BindingsImplTest extends ActiveMQTestBase { private void internalTest(final boolean route) throws Exception { final FakeBinding fake = new FakeBinding(new SimpleString("a")); - final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000)); + final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000)); bind.addBinding(fake); bind.addBinding(new FakeBinding(new SimpleString("a"))); bind.addBinding(new FakeBinding(new SimpleString("a"))); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java index 874cf6a8fb..b9858f788b 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -34,7 +35,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl; import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.junit.Ignore; import org.junit.Test; @@ -133,7 +133,7 @@ public class WildcardAddressManagerPerfTest { @Override public Bindings createBindings(SimpleString address) throws Exception { - return new BindingsImpl(address, null, new SimpleIDGenerator(1000)); + return new BindingsImpl(address, null, new NullStorageManager(1000)); } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index c42914dce5..591fabbb4b 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -251,8 +251,7 @@ public class FakePostOffice implements PostOffice { @Override public Pair redistribute(final Message message, - final Queue originatingQueue, - final Transaction tx) throws Exception { + final Queue originatingQueue) throws Exception { return null; }