ARTEMIS-3238 AMQP Mirror not routing correctly with SNF

This commit is contained in:
Clebert Suconic 2021-04-12 14:36:49 -04:00 committed by clebertsuconic
parent f580ecb56f
commit d408f284b1
5 changed files with 145 additions and 39 deletions

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
@ -204,11 +205,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) {
connectSender(queue, queue.getAddress().toString(), Symbol.valueOf("qd.waypoint"));
connectSender(queue, queue.getAddress().toString(), null, Symbol.valueOf("qd.waypoint"));
connectReceiver(protonRemotingConnection, session, sessionContext, queue, Symbol.valueOf("qd.waypoint"));
} else {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
connectSender(queue, queue.getAddress().toString());
connectSender(queue, queue.getAddress().toString(), null);
}
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
connectReceiver(protonRemotingConnection, session, sessionContext, queue);
@ -278,7 +279,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
AMQPMirrorBrokerConnectionElement replica = (AMQPMirrorBrokerConnectionElement)connectionElement;
Queue queue = server.locateQueue(replica.getSourceMirrorAddress());
connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS);
connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS, (r) -> AMQPMirrorControllerSource.validateProtocolData(r, replica.getSourceMirrorAddress()));
}
}
}
@ -457,6 +458,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
private void connectSender(Queue queue,
String targetName,
java.util.function.Consumer<? super MessageReference> beforeDeliver,
Symbol... capabilities) {
if (logger.isDebugEnabled()) {
logger.debug("Connecting outbound for " + queue);
@ -491,7 +493,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
AMQPOutgoingController outgoingInitializer = new AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI());
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer);
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver);
sessionContext.addSender(sender, senderContext);
} catch (Exception e) {

View File

@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -155,28 +156,42 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
MessageReference ref = MessageReference.Factory.createReference(message, snfQueue);
snfQueue.refUp(ref);
Map<Symbol, Object> daMap = new HashMap<>();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
daMap.put(INTERNAL_ID, message.getMessageID());
String address = message.getAddress();
if (address != null) { // this is the message that was set through routing
Properties amqpProperties = getProperties(message);
if (amqpProperties == null || !address.equals(amqpProperties.getTo())) {
// We set the internal destination property only if we need to
// otherwise we just use the one already set over Properties
daMap.put(INTERNAL_DESTINATION, message.getAddress());
}
}
ref.setProtocolData(deliveryAnnotations);
refs.add(ref);
message.usageUp();
setProtocolData(ref);
if (message.isDurable() && snfQueue.isDurable()) {
PostOfficeImpl.storeDurableReference(server.getStorageManager(), message, context.getTransaction(), snfQueue, true);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
public static void validateProtocolData(MessageReference ref, SimpleString snfAddress) {
if (ref.getProtocolData() == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
setProtocolData(ref);
}
}
private static void setProtocolData(MessageReference ref) {
Map<Symbol, Object> daMap = new HashMap<>();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
daMap.put(INTERNAL_ID, ref.getMessage().getMessageID());
String address = ref.getMessage().getAddress();
if (address != null) { // this is the message that was set through routing
Properties amqpProperties = getProperties(ref.getMessage());
if (amqpProperties == null || !address.equals(amqpProperties.getTo())) {
// We set the internal destination property only if we need to
// otherwise we just use the one already set over Properties
daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
}
}
ref.setProtocolData(deliveryAnnotations);
}
private static Properties getProperties(Message message) {
if (message instanceof AMQPMessage) {
return AMQPMessageBrokerAccessor.getCurrentProperties((AMQPMessage)message);

View File

@ -135,6 +135,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
* */
private final Object creditsLock = new Object();
private final java.util.function.Consumer<? super MessageReference> executeDelivery;
private java.util.function.Consumer<? super MessageReference> beforeDelivery;
private final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed;
public ProtonServerSenderContext(AMQPConnectionContext connection,
@ -160,6 +161,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
.isAmqpTreatRejectAsUnmodifiedDeliveryFailed();
}
public ProtonServerSenderContext setBeforeDelivery(java.util.function.Consumer<? super MessageReference> beforeDelivery) {
this.beforeDelivery = beforeDelivery;
return this;
}
public Object getBrokerConsumer() {
return brokerConsumer;
}
@ -492,6 +498,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return 0;
}
if (beforeDelivery != null) {
beforeDelivery.accept(messageReference);
}
try {
synchronized (creditsLock) {
if (sender.getLocalState() == EndpointState.CLOSED) {

View File

@ -1519,7 +1519,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
if (message.isLargeMessage()) {
confirmLargeMessageSend(tx, message);
confirmLargeMessageSend(storageManager, tx, message);
}
// We need to kick delivery so the Queues may check for the cursors case they are empty
@ -1595,24 +1595,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
refs.add(reference);
queue.refUp(reference);
if (message.isDurable()) {
final int durableRefCount = queue.durableUp(message);
if (durableRefCount == 1) {
if (tx != null) {
storageManager.storeMessageTransactional(tx.getID(), message);
} else {
storageManager.storeMessage(message);
}
if (message.isLargeMessage()) {
confirmLargeMessageSend(tx, message);
}
}
if (tx != null) {
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
tx.setContainsPersistent();
} else {
final boolean last = i == (durableQueuesCount - 1);
storageManager.storeReference(queue.getID(), message.getMessageID(), last);
}
storeDurableReference(storageManager, message, tx, queue, durableQueuesCount == i);
if (deliveryTime != null && deliveryTime > 0) {
if (tx != null) {
storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
@ -1624,12 +1607,36 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
public static void storeDurableReference(StorageManager storageManager, Message message,
Transaction tx,
Queue queue, boolean sync) throws Exception {
assert message.isDurable();
final int durableRefCount = queue.durableUp(message);
if (durableRefCount == 1) {
if (tx != null) {
storageManager.storeMessageTransactional(tx.getID(), message);
} else {
storageManager.storeMessage(message);
}
if (message.isLargeMessage()) {
confirmLargeMessageSend(storageManager, tx, message);
}
}
if (tx != null) {
storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
tx.setContainsPersistent();
} else {
storageManager.storeReference(queue.getID(), message.getMessageID(), sync);
}
}
/**
* @param tx
* @param message
* @throws Exception
*/
private void confirmLargeMessageSend(Transaction tx, final Message message) throws Exception {
private static void confirmLargeMessageSend(StorageManager storageManager, Transaction tx, final Message message) throws Exception {
LargeServerMessage largeServerMessage = (LargeServerMessage) message;
synchronized (largeServerMessage) {
if (largeServerMessage.getPendingRecordID() >= 0) {

View File

@ -446,6 +446,78 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
}
@Test
public void testRouteSurviving() throws Exception {
testRouteSurvivor(false);
}
@Test
public void testRouteSurvivingStop() throws Exception {
testRouteSurvivor(true);
}
private void testRouteSurvivor(boolean server1Stopped) throws Exception {
if (!server1Stopped) {
server.start();
}
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("server_2");
server_2.getConfiguration().setName("thisone");
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("OtherSide", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setSourceMirrorAddress("TheSource");
amqpConnection.addElement(replica);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.start();
// We create the address to avoid auto delete on the queue
server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
int NUMBER_OF_MESSAGES = 200;
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("i=" + i));
}
connection.close();
{
if (!server1Stopped) {
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
Queue queueServer1 = server.locateQueue(getQueueName());
Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer1::getMessageCount);
}
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Queue queueServer2 = server_2.locateQueue(getQueueName());
Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer2::getMessageCount);
}
if (!server1Stopped) {
server.stop();
}
server_2.stop();
server.start();
server_2.start();
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Queue queueServer1 = server.locateQueue(getQueueName());
Queue queueServer2 = server_2.locateQueue(getQueueName());
Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer1::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer2::getMessageCount);
}
private void replicaTest(boolean largeMessage,