ARTEMIS-4207 Improved fix on LargeMessage Redistribution

This fix will delay the message.copy to the redistributor itself.
Meaning no copy would be performed if the redistribution itself failed.

No need to remove a copy any longer
This commit is contained in:
Clebert Suconic 2023-03-27 09:53:07 -04:00 committed by clebertsuconic
parent d139ad75c2
commit 15d39a14ea
9 changed files with 86 additions and 46 deletions

View File

@ -42,7 +42,15 @@ public interface Bindings extends UnproposalListener {
MessageLoadBalancingType getMessageLoadBalancingType();
boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;
/**
*
* @param message the message being copied
* @param originatingQueue
* @param context
* @return a Copy of the message if redistribution succeeded, or null if it wasn't redistributed
* @throws Exception
*/
Message redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;
void route(Message message, RoutingContext context) throws Exception;

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.Proposal;
import org.apache.activemq.artemis.core.server.group.impl.Response;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
@ -75,6 +76,8 @@ public final class BindingsImpl implements Bindings {
private final SimpleString name;
private final IDGenerator idGenerator;
private static final AtomicInteger sequenceVersion = new AtomicInteger(Integer.MIN_VALUE);
/**
@ -82,8 +85,9 @@ public final class BindingsImpl implements Bindings {
*/
private final AtomicInteger version = new AtomicInteger(sequenceVersion.incrementAndGet());
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) {
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, IDGenerator idGenerator) {
this.groupingHandler = groupingHandler;
this.idGenerator = idGenerator;
this.name = name;
}
@ -114,8 +118,6 @@ public final class BindingsImpl implements Bindings {
}
}
@Override
public void addBinding(final Binding binding) {
try {
@ -181,12 +183,12 @@ public final class BindingsImpl implements Bindings {
}
@Override
public boolean redistribute(final Message message,
public Message redistribute(final Message message,
final Queue originatingQueue,
final RoutingContext context) throws Exception {
final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
if (loadBalancingType.equals(MessageLoadBalancingType.STRICT) || loadBalancingType.equals(MessageLoadBalancingType.OFF)) {
return false;
return null;
}
logger.trace("Redistributing message {}", message);
@ -198,7 +200,7 @@ public final class BindingsImpl implements Bindings {
if (bindingsAndPosition == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
return false;
return null;
}
final Binding[] bindings = bindingsAndPosition.getA();
@ -227,11 +229,21 @@ public final class BindingsImpl implements Bindings {
}
}
if (nextBinding == null) {
return false;
return null;
}
// The message needs a new ID during the redistribution
// We have to create the new ID only after we can guarantee it will be routed
// otherwise we may leave large messages stranded in the folder
final Message copyRedistribute = message.copy(idGenerator.generateID());
if (logger.isDebugEnabled()) {
logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
}
copyRedistribute.setAddress(message.getAddress());
bindingIndex.setIndex(nextPosition);
nextBinding.route(message, context);
return true;
nextBinding.route(copyRedistribute, context);
return copyRedistribute;
}
@Override

View File

@ -1392,35 +1392,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
if (bindings != null && bindings.allowRedistribute()) {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
final Message copyRedistribute = message.copy(storageManager.generateID());
if (logger.isDebugEnabled()) {
logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
}
copyRedistribute.setAddress(message.getAddress());
RoutingContext context = new RoutingContextImpl(tx);
boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);
if (routed) {
return new Pair<>(context, copyRedistribute);
} else {
// things have changed, we are not redistributing any more
if (copyRedistribute.isLargeMessage()) {
LargeServerMessage lsm = (LargeServerMessage) copyRedistribute;
postOfficeExecutor.execute(() -> {
try {
logger.debug("Removing large message {} since the routing tables have changed", lsm.getAppendFile());
lsm.deleteFile();
} catch (Exception e) {
logger.warn("Error removing {}", copyRedistribute);
}
});
}
// the redistributor will make a copy of the message if it can be redistributed
Message redistributedMessage = bindings.redistribute(message, originatingQueue, context);
if (redistributedMessage != null) {
return new Pair<>(context, redistributedMessage);
}
}
@ -2101,7 +2079,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public Bindings createBindings(final SimpleString address) {
GroupingHandler groupingHandler = server.getGroupingHandler();
BindingsImpl bindings = new BindingsImpl(CompositeAddress.extractAddressName(address), groupingHandler);
BindingsImpl bindings = new BindingsImpl(CompositeAddress.extractAddressName(address), groupingHandler, storageManager);
if (groupingHandler != null) {
groupingHandler.addListener(bindings);
}

View File

@ -62,4 +62,42 @@ public class LargeMessageRedistributionTest extends MessageRedistributionTest {
Wait.assertEquals(0, () -> getServer(0).getConfiguration().getLargeMessagesLocation().listFiles().length);
Wait.assertEquals(numMessages, () -> getServer(1).getConfiguration().getLargeMessagesLocation().listFiles().length);
}
@Test
public void testRedistributionLargeMessageDirCleanup2() throws Exception {
final long delay = 0;
final int numMessages = 5;
setRedistributionDelay(delay);
setupCluster(MessageLoadBalancingType.ON_DEMAND);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, false);
send(0, "queues.testaddress", numMessages, true, null);
addConsumer(0, 0, "queue0", null);
verifyReceiveAll(numMessages, 0);
removeConsumer(0);
addConsumer(1, 1, "queue0", null);
verifyReceiveAll(numMessages, 1);
servers[1].stop();
send(0, "queues.testaddress", numMessages, true, null);
Wait.assertEquals(5, () -> getServer(0).getConfiguration().getLargeMessagesLocation().listFiles().length);
Wait.assertEquals(numMessages, () -> getServer(0).getConfiguration().getLargeMessagesLocation().listFiles().length);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
@ -51,7 +52,7 @@ public class WildcardAddressManagerHeirarchyPerfTest {
@Override
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null);
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
}
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
@ -53,7 +54,7 @@ public class WildcardAddressManagerPerfTest {
@Override
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null);
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
}
}

View File

@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Test;
public class BindingsImplTest extends ActiveMQTestBase {
@ -54,7 +55,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
final Bindings bind = new BindingsImpl(null, null);
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(1, fake.routedCount.get());
@ -65,7 +66,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
final Bindings bind = new BindingsImpl(null, null);
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
@ -76,7 +77,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
final FakeRemoteBinding fake = new FakeRemoteBinding(new SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
final Bindings bind = new BindingsImpl(null, null);
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new FakeTransaction()));
assertEquals(0, fake.routedCount.get());
@ -101,7 +102,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
private void internalTest(final boolean route) throws Exception {
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
final Bindings bind = new BindingsImpl(null, null);
final Bindings bind = new BindingsImpl(null, null, new SimpleIDGenerator(1000));
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Ignore;
import org.junit.Test;
@ -132,7 +133,7 @@ public class WildcardAddressManagerPerfTest {
@Override
public Bindings createBindings(SimpleString address) throws Exception {
return new BindingsImpl(address, null);
return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
}
}

View File

@ -478,10 +478,10 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
}
@Override
public boolean redistribute(Message message,
public Message redistribute(Message message,
Queue originatingQueue,
RoutingContext context) throws Exception {
return false;
return null;
}
@Override