From 778ab4419fe9b373e70b6589a1fb7cd2c550bec8 Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 16 Nov 2021 11:55:52 +0000 Subject: [PATCH] ARTEMIS-3575 - ensure message owner is set from journal reload, follow up on ARTEMIS-3067 --- .../core/postoffice/impl/PostOfficeImpl.java | 1 + .../management/AddressControlTest.java | 84 +++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index d1db38641d..659500dec2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1258,6 +1258,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception { + message.setOwner(pagingManager.getPageStore(message.getAddressSimpleString())); MessageReference reference = MessageReference.Factory.createReference(message, queue); Long scheduledDeliveryTime; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 4ed58be88b..55c0f4dc60 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -693,6 +693,90 @@ public class AddressControlTest extends ManagementTestBase { } } + @Test + public void testAddressSizeAfterRestart() throws Exception { + session.close(); + server.stop(); + server.getConfiguration().setPersistenceEnabled(true); + + SimpleString address = RandomUtil.randomSimpleString(); + + server.start(); + ServerLocator locator2 = createInVMNonHALocator(); + addServerLocator(locator2); + ClientSessionFactory sf2 = createSessionFactory(locator2); + + session = sf2.createSession(false, true, false); + session.start(); + session.createQueue(new QueueConfiguration(address)); + + ClientProducer producer = session.createProducer(address); + + final int numMessages = 10; + final int payLoadSize = 896; + for (int i = 0; i < numMessages; i++) { + ClientMessage msg = session.createMessage(true); + msg.getBodyBuffer().writeBytes(new byte[payLoadSize]); + producer.send(msg); + } + session.commit(); + + AddressControl addressControl = createManagementControl(address); + Assert.assertTrue(addressControl.getAddressSize() > numMessages * payLoadSize ); + + // restart to reload journal + server.stop(); + server.start(); + + addressControl = createManagementControl(address); + Assert.assertTrue(addressControl.getAddressSize() > numMessages * payLoadSize ); + } + + + @Test + public void testAddressSizeAfterRestartWithPaging() throws Exception { + session.close(); + server.stop(); + server.getConfiguration().setPersistenceEnabled(true); + + final int payLoadSize = 896; + final int pageLimitNumberOfMessages = 4; + SimpleString address = RandomUtil.randomSimpleString(); + AddressSettings addressSettings = new AddressSettings().setPageSizeBytes(payLoadSize * 2).setMaxSizeBytes(payLoadSize * pageLimitNumberOfMessages); + server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings); + + server.start(); + ServerLocator locator2 = createInVMNonHALocator(); + addServerLocator(locator2); + ClientSessionFactory sf2 = createSessionFactory(locator2); + + session = sf2.createSession(false, true, false); + session.start(); + session.createQueue(new QueueConfiguration(address)); + + ClientProducer producer = session.createProducer(address); + + final int numMessages = 8; + for (int i = 0; i < numMessages; i++) { + ClientMessage msg = session.createMessage(true); + msg.getBodyBuffer().writeBytes(new byte[payLoadSize]); + producer.send(msg); + } + session.commit(); + + AddressControl addressControl = createManagementControl(address); + Assert.assertTrue(addressControl.getAddressSize() > pageLimitNumberOfMessages * payLoadSize ); + + final long exactSizeValueBeforeRestart = addressControl.getAddressSize(); + + // restart to reload journal + server.stop(); + server.start(); + + addressControl = createManagementControl(address); + Assert.assertTrue(addressControl.getAddressSize() > pageLimitNumberOfMessages * payLoadSize ); + Assert.assertEquals(exactSizeValueBeforeRestart, addressControl.getAddressSize()); + } @Override