This commit is contained in:
Clebert Suconic 2024-08-28 12:56:42 -04:00 committed by clebertsuconic
parent 4ba9f67d80
commit 4352ebae5f
7 changed files with 48 additions and 50 deletions

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
@ -180,6 +181,15 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
private AckManager ackManager;
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
AckManager localAckManager = ackManager;
if (localAckManager != null) {
localAckManager.unregisterMirror(this);
}
}
/** This method will wait both replication and storage to finish their current operations. */
public void flush() {
CountDownLatch latch = new CountDownLatch(1);
@ -208,7 +218,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
try {
timeout = connection.getProtocolManager().getAckManagerFlushTimeout();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
// This is redundant code that should not occur
// Only real possibility for this would be a Mocking test, or some embedded usage
logger.warn("Could not access the connection and protocol manager, using a default timeout of 10 seconds for AckManagerFlushTimeout", e);
timeout = 10_000;
}
@ -217,7 +229,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
ActiveMQAMQPProtocolLogger.LOGGER.timedOutAckManager(timeout);
}
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
ActiveMQAMQPProtocolLogger.LOGGER.interruptedAckManager(e);
Thread.currentThread().interrupt();
}
}
@ -457,6 +469,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
if (ackManager == null) {
ackManager = AckManagerProvider.getManager(server);
ackManager.registerMirror(this);
}
ackManager.ack(nodeID, targetQueue, messageID, reason, true);

View File

@ -19,8 +19,10 @@ package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
@ -52,7 +54,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,6 +64,7 @@ public class AckManager implements ActiveMQComponent {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final Set<AMQPMirrorControllerTarget> mirrorControllerTargets = new HashSet<>();
final LongSupplier sequenceGenerator;
final JournalHashMapProvider<AckRetry, AckRetry, Queue> journalHashMapProvider;
final ActiveMQServer server;
@ -149,7 +151,7 @@ public class AckManager implements ActiveMQComponent {
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retries = sortRetries();
scanAndFlushMirrorTargets();
flushMirrorTargets();
if (retries.isEmpty()) {
logger.trace("Nothing to retry!, server={}", server);
@ -160,19 +162,17 @@ public class AckManager implements ActiveMQComponent {
return true;
}
private void scanAndFlushMirrorTargets() {
logger.debug("scanning and flushing mirror targets");
// this will navigate on each connection, find the connection that has a mirror controller, and call flushMirrorTarget for each MirrorTargets. (it should be 1 in most cases)
// An alternative design instead of going through the connections, would be to register the MirrorTargets within the AckManager, however to avoid memory leaks after disconnects and reconnects it is safer to
// scan through the connections
server.getRemotingService().getConnections().stream().
filter(c -> c instanceof ActiveMQProtonRemotingConnection && ((ActiveMQProtonRemotingConnection) c).getAmqpConnection().getMirrorControllerTargets() != null).
forEach(c -> ((ActiveMQProtonRemotingConnection) c).getAmqpConnection().getMirrorControllerTargets().forEach(this::flushMirrorTarget));
public synchronized void registerMirror(AMQPMirrorControllerTarget mirrorTarget) {
this.mirrorControllerTargets.add(mirrorTarget);
}
private void flushMirrorTarget(AMQPMirrorControllerTarget target) {
logger.debug("Flushing mirror {}", target);
target.flush();
public synchronized void unregisterMirror(AMQPMirrorControllerTarget mirrorTarget) {
this.mirrorControllerTargets.remove(mirrorTarget);
}
private synchronized void flushMirrorTargets() {
logger.debug("scanning and flushing mirror targets");
mirrorControllerTargets.forEach(AMQPMirrorControllerTarget::flush);
}
// Sort the ACK list by address

View File

@ -61,4 +61,7 @@ public interface ActiveMQAMQPProtocolLogger {
@LogMessage(id = 111008, value = "The AckManager timed out waiting for operations to complete on the MirrorTarget. timeout = {} milliseconds", level = LogMessage.Level.WARN)
void timedOutAckManager(long timeout);
@LogMessage(id = 111009, value = "The AckManager was interrupt. timeout = {} milliseconds", level = LogMessage.Level.WARN)
void interruptedAckManager(Exception e);
}

View File

@ -17,11 +17,9 @@
package org.apache.activemq.artemis.protocol.amqp.proton;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -46,7 +44,6 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationAddressSenderController;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueueSenderController;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurityException;
@ -111,8 +108,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
flush();
}
private List<AMQPMirrorControllerTarget> mirrorControllerTargets;
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
public static final String AMQP_CONTAINER_ID = "amqp-container-id";
private static final FutureTask<Void> VOID_FUTURE = new FutureTask<>(() -> { }, null);
@ -203,18 +198,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
public List<AMQPMirrorControllerTarget> getMirrorControllerTargets() {
return mirrorControllerTargets;
}
public AMQPConnectionContext addMirrorControllerTarget(AMQPMirrorControllerTarget mirrorControllerTarget) {
if (mirrorControllerTargets == null) {
mirrorControllerTargets = new ArrayList<>();
}
mirrorControllerTargets.add(mirrorControllerTarget);
return this;
}
public boolean isLargeMessageSync() {
return connectionCallback.isLargeMessageSync();
}

View File

@ -314,8 +314,6 @@ public class AMQPSessionContext extends ProtonInitializable {
final AMQPMirrorControllerTarget protonReceiver =
new AMQPMirrorControllerTarget(sessionSPI, connection, this, receiver, server);
connection.addMirrorControllerTarget(protonReceiver);
final HashMap<Symbol, Object> brokerIDProperties = new HashMap<>();
brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, server.getNodeID().toString());
receiver.setProperties(brokerIDProperties);

View File

@ -27,7 +27,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
public final class AckRetry {
String nodeID;
byte[] temporaryNodeBytes;
byte[] nodeIDBytes;
long messageID;
AckReason reason;
int pageAttempts;
@ -55,10 +55,10 @@ public final class AckRetry {
public synchronized byte[] getNodeIDBytes() {
if (temporaryNodeBytes == null) {
temporaryNodeBytes = nodeID.getBytes(StandardCharsets.US_ASCII);
if (nodeIDBytes == null) {
nodeIDBytes = nodeID.getBytes(StandardCharsets.US_ASCII);
}
return temporaryNodeBytes;
return nodeIDBytes;
}
public String getNodeID() {

View File

@ -51,7 +51,6 @@ import org.apache.activemq.artemis.utils.TestParameters;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@ -59,6 +58,7 @@ import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
@ -328,7 +328,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
for (int i = 0; i < NUMBER_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(30_000);
Assertions.assertNotNull(message);
assertNotNull(message);
receivedIDs.add(message.getIntProperty("i"));
if (i > 0 && i % SEND_COMMIT == 0) {
logger.info("Received {} messages", i);
@ -339,9 +339,9 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
session.commit();
}
Assertions.assertEquals(NUMBER_MESSAGES, receivedIDs.size());
assertEquals(NUMBER_MESSAGES, receivedIDs.size());
for (int i = 0; i < NUMBER_MESSAGES; i++) {
Assertions.assertTrue(receivedIDs.contains(i));
assertTrue(receivedIDs.contains(i));
}
}
@ -390,7 +390,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new File(getServerLocation(DC2_REPLICA_NODE), "broker.properties"));
} else if (i == killAt) { // kill the live on DC2
logger.info("KillAt {}", killAt);
ServerUtil.waitForServerToStart(1, 10_000);
ServerUtil.waitForServerToStart(2, 10_000);
Wait.assertTrue(managementDC2::isReplicaSync);
processDC2.destroyForcibly();
assertTrue(processDC2.waitFor(10, TimeUnit.SECONDS));
@ -401,9 +401,10 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
message.setIntProperty("i", i);
producer.send(message);
TextMessage textMessage = (TextMessage) consumer.receive(5000);
Assertions.assertNotNull(textMessage);
Assertions.assertEquals(text, textMessage.getText());
assertNotNull(textMessage);
assertEquals(text, textMessage.getText());
}
}
final int oddSend = 33;
@ -433,7 +434,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < oddSend; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assertions.assertNotNull(message);
assertNotNull(message);
assertEquals("oddSend " + i, message.getText());
}
assertNull(consumer.receiveNoWait());
@ -486,7 +487,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
});
}
Assertions.assertTrue(latch.await(5, TimeUnit.MINUTES));
assertTrue(latch.await(5, TimeUnit.MINUTES));
int openFiles = lsof();
@ -494,8 +495,8 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
// lsof is showing a file descriptor associated with multiple threads. So it is expected to have quite a few repetitions
// when the issue is happening we would have around 40k, 50k entries or a lot more if you add more messages.
Assertions.assertTrue(openFiles < 4000, () -> "There was " + openFiles + " open files");
Assertions.assertEquals(0, errors.get(), "There are errors on the senders");
assertTrue(openFiles < 4000, () -> "There was " + openFiles + " open files");
assertEquals(0, errors.get(), "There are errors on the senders");
}
@ -515,7 +516,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
filesCounter.incrementAndGet();
});
}
Assertions.assertTrue(process.waitFor(10, TimeUnit.SECONDS));
assertTrue(process.waitFor(10, TimeUnit.SECONDS));
return filesCounter.get();
}