ARTEMIS-4784 Large Messages should removed from HashMap on ReplicatioEndpoint after being closed

This commit is contained in:
Clebert Suconic 2024-05-28 16:08:17 -04:00 committed by clebertsuconic
parent 0ca36b7b38
commit fa06b70134
3 changed files with 106 additions and 20 deletions

View File

@ -608,7 +608,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (logger.isTraceEnabled()) {
logger.trace("handleLargeMessageEnd on {}", packet.getMessageId());
}
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete(), false);
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
if (message != null) {
if (!packet.isDelete()) {
if (logger.isTraceEnabled()) {
@ -644,12 +644,15 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
private ReplicatedLargeMessage lookupLargeMessage(final long messageId,
final boolean delete,
final boolean remove,
final boolean createIfNotExists) {
ReplicatedLargeMessage message;
if (delete) {
if (remove) {
message = largeMessages.remove(messageId);
if (message == null) {
return newLargeMessage(messageId, false);
}
} else {
message = largeMessages.get(messageId);
if (message == null) {
@ -657,8 +660,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
createLargeMessage(messageId, false);
message = largeMessages.get(messageId);
} else {
// No warnings if it's a delete, as duplicate deletes may be sent repeatedly.
ActiveMQServerLogger.LOGGER.largeMessageNotAvailable(messageId);
return newLargeMessage(messageId, false);
}
}
}
@ -679,6 +681,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
private void createLargeMessage(final long id, boolean liveToBackupSync) {
ReplicatedLargeMessage msg = newLargeMessage(id, liveToBackupSync);
largeMessages.put(id, msg);
}
private ReplicatedLargeMessage newLargeMessage(long id, boolean liveToBackupSync) {
ReplicatedLargeMessage msg;
if (liveToBackupSync) {
msg = new LargeServerMessageInSync(storageManager);
@ -688,7 +695,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
msg.setDurable(true);
msg.setMessageID(id);
largeMessages.put(id, msg);
return msg;
}
/**

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -82,7 +83,6 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport {
ActiveMQServer server = primaryServer.getServer();
boolean crashServer = true;
int size = 100 * 1024;
AmqpClient client = createAmqpClient(new URI(smallFrameLive));
AmqpConnection connection = client.createConnection();
@ -115,21 +115,19 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport {
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
if (crashServer) {
connection.close();
primaryServer.crash();
connection.close();
primaryServer.crash();
Wait.assertTrue(backupServer::isActive);
Wait.assertTrue(backupServer::isActive);
server = backupServer.getServer();
server = backupServer.getServer();
client = createAmqpClient(new URI(smallFrameBackup));
connection = client.createConnection();
addConnection(connection);
connection.setMaxFrameSize(2 * 1024);
connection.connect();
session = connection.createSession();
}
client = createAmqpClient(new URI(smallFrameBackup));
connection = client.createConnection();
addConnection(connection);
connection.setMaxFrameSize(2 * 1024);
connection.connect();
session = connection.createSession();
queueView = server.locateQueue(getQueueName());
Wait.assertEquals(100, queueView::getMessageCount);
@ -163,4 +161,77 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport {
}
}
@Test(timeout = 60_000)
public void testCloseFilesOnTarget() throws Exception {
try {
ActiveMQServer server = primaryServer.getServer();
int size = 100 * 1024;
AmqpClient client = createAmqpClient(new URI(smallFrameLive));
AmqpConnection connection = client.createConnection();
addConnection(connection);
connection.setMaxFrameSize(2 * 1024);
connection.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName().toString());
Queue queueView = server.locateQueue(getQueueName());
assertNotNull(queueView);
assertEquals(0, queueView.getMessageCount());
session.begin();
for (int m = 0; m < 100; m++) {
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
message.setApplicationProperty("i", "m " + m);
byte[] bytes = new byte[size];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte) 'z';
}
message.setBytes(bytes);
sender.send(message);
}
session.commit();
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
queueView = server.locateQueue(getQueueName());
Wait.assertEquals(100, queueView::getMessageCount);
SharedNothingBackupActivation activation = (SharedNothingBackupActivation) backupServer.getServer().getActivation();
Wait.assertEquals(0, () -> activation.getReplicationEndpoint().getLargeMessages().size(), 5000);
AmqpReceiver receiver = session.createReceiver(getQueueName().toString());
receiver.flow(100);
for (int i = 0; i < 100; i++) {
AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(msgReceived);
Data body = (Data)msgReceived.getWrappedMessage().getBody();
byte[] bodyArray = body.getValue().getArray();
for (int bI = 0; bI < size; bI++) {
Assert.assertEquals((byte)'z', bodyArray[bI]);
}
msgReceived.accept(true);
}
receiver.flow(1);
Assert.assertNull(receiver.receiveNoWait());
receiver.close();
connection.close();
Wait.assertEquals(0, queueView::getMessageCount);
validateNoFilesOnLargeDir(getLargeMessagesDir(0, false), 0);
validateNoFilesOnLargeDir(getLargeMessagesDir(0, true), 0);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
}

View File

@ -92,6 +92,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
@ -576,13 +577,20 @@ public final class ReplicationTest extends ActiveMQTestBase {
CoreMessage msg = new CoreMessage().initBuffer(1024).setMessageID(1);
LargeServerMessage largeMsg = liveServer.getStorageManager().createCoreLargeMessage(500, msg);
largeMsg.addBytes(new byte[1024]);
largeMsg.releaseResources(true, true);
blockOnReplication(storage, manager);
LargeServerMessageImpl message1 = (LargeServerMessageImpl) getReplicationEndpoint(backupServer).getLargeMessages().get(500L);
Assert.assertNotNull(message1);
Assert.assertTrue(largeMsg.getAppendFile().isOpen());
Assert.assertTrue(message1.getAppendFile().isOpen());
largeMsg.releaseResources(true, true);
Wait.assertTrue(() -> getReplicationEndpoint(backupServer).getLargeMessages().get(500L) == null, 5000);
Assert.assertFalse(largeMsg.getAppendFile().isOpen());
Assert.assertFalse(message1.getAppendFile().isOpen());
}