ARTEMIS-4207 Improving redistribution fix over large messages

This commit is contained in:
Clebert Suconic 2023-04-08 11:02:57 -04:00 committed by clebertsuconic
parent d23ced586d
commit e368dacc78
10 changed files with 36 additions and 30 deletions

View File

@ -101,6 +101,11 @@ public class NullStorageManager implements StorageManager {
});
}
public NullStorageManager(int nextId) {
this();
this.setNextId(nextId);
}
@Override
public void criticalError(Throwable error) {
ioCriticalErrorListener.onIOException(error, error.getMessage(), null);

View File

@ -190,8 +190,7 @@ public interface PostOffice extends ActiveMQComponent {
MessageReference reload(Message message, Queue queue, Transaction tx) throws Exception;
Pair<RoutingContext, Message> redistribute(Message message,
Queue originatingQueue,
Transaction tx) throws Exception;
Queue originatingQueue) throws Exception;
void processRoute(Message message, RoutingContext context, boolean direct) throws Exception;

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@ -45,8 +46,8 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.Proposal;
import org.apache.activemq.artemis.core.server.group.impl.Response;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
@ -76,7 +77,7 @@ public final class BindingsImpl implements Bindings {
private final SimpleString name;
private final IDGenerator idGenerator;
private final StorageManager storageManager;
private static final AtomicInteger sequenceVersion = new AtomicInteger(Integer.MIN_VALUE);
@ -85,9 +86,9 @@ public final class BindingsImpl implements Bindings {
*/
private final AtomicInteger version = new AtomicInteger(sequenceVersion.incrementAndGet());
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, IDGenerator idGenerator) {
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, StorageManager storageManager) {
this.groupingHandler = groupingHandler;
this.idGenerator = idGenerator;
this.storageManager = storageManager;
this.name = name;
}
@ -235,12 +236,16 @@ public final class BindingsImpl implements Bindings {
// The message needs a new ID during the redistribution
// We have to create the new ID only after we can guarantee it will be routed
// otherwise we may leave large messages stranded in the folder
final Message copyRedistribute = message.copy(idGenerator.generateID());
final Message copyRedistribute = message.copy(storageManager.generateID());
if (logger.isDebugEnabled()) {
logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
}
copyRedistribute.setAddress(message.getAddress());
if (context.getTransaction() == null) {
context.setTransaction(new TransactionImpl(storageManager));
}
bindingIndex.setIndex(nextPosition);
nextBinding.route(copyRedistribute, context);
return copyRedistribute;

View File

@ -1387,12 +1387,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
*/
@Override
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
final Queue originatingQueue) throws Exception {
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
if (bindings != null && bindings.allowRedistribute()) {
RoutingContext context = new RoutingContextImpl(tx);
RoutingContext context = new RoutingContextImpl(null);
// the redistributor will make a copy of the message if it can be redistributed
Message redistributedMessage = bindings.redistribute(message, originatingQueue, context);

View File

@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -114,23 +113,23 @@ public class Redistributor implements Consumer {
return HandleStatus.NO_MATCH;
}
final Transaction tx = new TransactionImpl(storageManager);
final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue);
if (routingInfo == null) {
logger.debug("postOffice.redistribute return null for message {}", reference);
tx.rollback();
return HandleStatus.BUSY;
}
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
RoutingContext context = routingInfo.getA();
Message message = routingInfo.getB();
postOffice.processRoute(message, context, false);
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(reference.getMessage(), "redistributing");
}
ackRedistribution(reference, tx);
ackRedistribution(reference, context.getTransaction());
return HandleStatus.HANDLED;
}

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -31,7 +32,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
@ -52,7 +52,7 @@ public class WildcardAddressManagerHeirarchyPerfTest {
@Override
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
return new BindingsImpl(address, null, new NullStorageManager(1000));
}
}

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
@ -54,7 +54,7 @@ public class WildcardAddressManagerPerfTest {
@Override
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
return new BindingsImpl(address, null, new NullStorageManager(1000));
}
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -45,7 +46,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Test;
public class BindingsImplTest extends ActiveMQTestBase {
@ -55,7 +55,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(1, fake.routedCount.get());
@ -66,7 +66,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
@ -77,7 +77,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
@ -102,7 +102,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
private void internalTest(final boolean route) throws Exception {
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
final Bindings bind = new BindingsImpl(null, null, new NullStorageManager(1000));
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -34,7 +35,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Ignore;
import org.junit.Test;
@ -133,7 +133,7 @@ public class WildcardAddressManagerPerfTest {
@Override
public Bindings createBindings(SimpleString address) throws Exception {
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
return new BindingsImpl(address, null, new NullStorageManager(1000));
}
}

View File

@ -251,8 +251,7 @@ public class FakePostOffice implements PostOffice {
@Override
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
final Queue originatingQueue) throws Exception {
return null;
}