ARTEMIS-3575 - ensure message owner is set from journal reload, follow up on ARTEMIS-3067

This commit is contained in:
gtully 2021-11-16 11:55:52 +00:00 committed by clebertsuconic
parent abf82bc851
commit 778ab4419f
2 changed files with 85 additions and 0 deletions

View File

@ -1258,6 +1258,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override @Override
public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception { public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
message.setOwner(pagingManager.getPageStore(message.getAddressSimpleString()));
MessageReference reference = MessageReference.Factory.createReference(message, queue); MessageReference reference = MessageReference.Factory.createReference(message, queue);
Long scheduledDeliveryTime; Long scheduledDeliveryTime;

View File

@ -693,6 +693,90 @@ public class AddressControlTest extends ManagementTestBase {
} }
} }
@Test
public void testAddressSizeAfterRestart() throws Exception {
session.close();
server.stop();
server.getConfiguration().setPersistenceEnabled(true);
SimpleString address = RandomUtil.randomSimpleString();
server.start();
ServerLocator locator2 = createInVMNonHALocator();
addServerLocator(locator2);
ClientSessionFactory sf2 = createSessionFactory(locator2);
session = sf2.createSession(false, true, false);
session.start();
session.createQueue(new QueueConfiguration(address));
ClientProducer producer = session.createProducer(address);
final int numMessages = 10;
final int payLoadSize = 896;
for (int i = 0; i < numMessages; i++) {
ClientMessage msg = session.createMessage(true);
msg.getBodyBuffer().writeBytes(new byte[payLoadSize]);
producer.send(msg);
}
session.commit();
AddressControl addressControl = createManagementControl(address);
Assert.assertTrue(addressControl.getAddressSize() > numMessages * payLoadSize );
// restart to reload journal
server.stop();
server.start();
addressControl = createManagementControl(address);
Assert.assertTrue(addressControl.getAddressSize() > numMessages * payLoadSize );
}
@Test
public void testAddressSizeAfterRestartWithPaging() throws Exception {
session.close();
server.stop();
server.getConfiguration().setPersistenceEnabled(true);
final int payLoadSize = 896;
final int pageLimitNumberOfMessages = 4;
SimpleString address = RandomUtil.randomSimpleString();
AddressSettings addressSettings = new AddressSettings().setPageSizeBytes(payLoadSize * 2).setMaxSizeBytes(payLoadSize * pageLimitNumberOfMessages);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
server.start();
ServerLocator locator2 = createInVMNonHALocator();
addServerLocator(locator2);
ClientSessionFactory sf2 = createSessionFactory(locator2);
session = sf2.createSession(false, true, false);
session.start();
session.createQueue(new QueueConfiguration(address));
ClientProducer producer = session.createProducer(address);
final int numMessages = 8;
for (int i = 0; i < numMessages; i++) {
ClientMessage msg = session.createMessage(true);
msg.getBodyBuffer().writeBytes(new byte[payLoadSize]);
producer.send(msg);
}
session.commit();
AddressControl addressControl = createManagementControl(address);
Assert.assertTrue(addressControl.getAddressSize() > pageLimitNumberOfMessages * payLoadSize );
final long exactSizeValueBeforeRestart = addressControl.getAddressSize();
// restart to reload journal
server.stop();
server.start();
addressControl = createManagementControl(address);
Assert.assertTrue(addressControl.getAddressSize() > pageLimitNumberOfMessages * payLoadSize );
Assert.assertEquals(exactSizeValueBeforeRestart, addressControl.getAddressSize());
}
@Override @Override