From ebf8adc72b261e7e2375f3aa96354af8a0783abd Mon Sep 17 00:00:00 2001 From: nbrendah Date: Wed, 20 Oct 2021 11:00:39 +0300 Subject: [PATCH 1/2] ARTEMIS-3523: Created delegated methods replay in addressControl --- .../api/core/management/AddressControl.java | 9 ++ .../management/impl/AddressControlImpl.java | 19 +++ .../management/AddressControlTest.java | 116 ++++++++++++++++++ .../AddressControlUsingCoreTest.java | 10 ++ 4 files changed, 154 insertions(+) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index 539988b688..beef4c30af 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -230,4 +230,13 @@ public interface AddressControl { @Operation(desc = "Purges the queues bound to this address. Returns the total number of messages purged.", impact = MBeanOperationInfo.ACTION) long purge() throws Exception; + @Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION) + void replay(@Parameter(name = "target", desc = "Where the replay data should be sent") String target, + @Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception; + + @Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION) + void replay(@Parameter(name = "startScanDate", desc = "Start date where we will start scanning for journals to replay. Format YYYYMMDDHHMMSS") String startScan, + @Parameter(name = "endScanDate", desc = "Finish date where we will stop scannning for journals to replay. Format YYYYMMDDHHMMSS") String endScan, + @Parameter(name = "target", desc = "Where the replay data should be sent") String target, + @Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index e384697de0..6277db9113 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -19,8 +19,10 @@ package org.apache.activemq.artemis.core.management.impl; import javax.json.JsonArrayBuilder; import javax.management.MBeanAttributeInfo; import javax.management.MBeanOperationInfo; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -48,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.core.server.replay.ReplayManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.utils.JsonLoader; @@ -574,6 +577,22 @@ public class AddressControlImpl extends AbstractControl implements AddressContro return totalMsgs; } + @Override + public void replay(String target, String filter) throws Exception { + server.replay(null, null, this.getAddress(), target, filter); + } + + @Override + public void replay(String startScan, String endScan, String target, String filter) throws Exception { + + SimpleDateFormat format = ReplayManager.newRetentionSimpleDateFormat(); + + Date startScanDate = format.parse(startScan); + Date endScanDate = format.parse(endScan); + + server.replay(startScanDate, endScanDate, this.getAddress(), target, filter); + } + // Private ------------------------------------------------------- private long getMessageCount(final DurabilityType durability) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 2095272141..a303ad6fbd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -16,10 +16,18 @@ */ package org.apache.activemq.artemis.tests.integration.management; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.json.JsonArray; import javax.json.JsonString; +import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.EnumSet; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -47,8 +55,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.RandomUtil; @@ -580,6 +590,111 @@ public class AddressControlTest extends ManagementTestBase { Wait.assertEquals(0L, () -> addressControl.getMessageCount(), 2000, 100); } + @Test + public void testReplayWithoutDate() throws Exception { + testReplaySimple(false); + } + + @Test + public void testReplayWithDate() throws Exception { + testReplaySimple(true); + } + + private void testReplaySimple(boolean useDate) throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + + AddressControl addressControl = createManagementControl(address); + String queue = "testQueue" + RandomUtil.randomString(); + server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue)); + + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue jmsQueue = session.createQueue(queue); + MessageProducer producer = session.createProducer(jmsQueue); + producer.send(session.createTextMessage("before")); + + connection.start(); + MessageConsumer consumer = session.createConsumer(jmsQueue); + Assert.assertNotNull(consumer.receive(5000)); + Assert.assertNull(consumer.receiveNoWait()); + + addressControl.replay(queue, null); + Assert.assertNotNull(consumer.receive(5000)); + Assert.assertNull(consumer.receiveNoWait()); + + if (useDate) { + addressControl.replay("dontexist", null); // just to force a move next file, and copy stuff into place + SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); + Thread.sleep(1000); // waiting a second just to have the timestamp change + String dateEnd = format.format(new Date()); + Thread.sleep(1000); // waiting a second just to have the timestamp change + String dateStart = "19800101000000"; + + + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage("after receiving")); + } + for (int i = 0; i < 100; i++) { + Assert.assertNotNull(consumer.receive()); + } + Assert.assertNull(consumer.receiveNoWait()); + addressControl.replay(dateStart, dateEnd, queue, null); + for (int i = 0; i < 2; i++) { // replay of the replay will contain two messages + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("before", message.getText()); + } + Assert.assertNull(consumer.receiveNoWait()); + } else { + addressControl.replay(queue, null); + + // replay of the replay, there will be two messages + for (int i = 0; i < 2; i++) { + Assert.assertNotNull(consumer.receive(5000)); + } + Assert.assertNull(consumer.receiveNoWait()); + } + } + } + + @Test + public void testReplayFilter() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + + AddressControl addressControl = createManagementControl(address); + String queue = "testQueue" + RandomUtil.randomString(); + server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue)); + + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue jmsQueue = session.createQueue(queue); + MessageProducer producer = session.createProducer(jmsQueue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage("message " + i); + message.setIntProperty("i", i); + producer.send(message); + } + + connection.start(); + MessageConsumer consumer = session.createConsumer(jmsQueue); + for (int i = 0; i < 10; i++) { + Assert.assertNotNull(consumer.receive(5000)); + } + Assert.assertNull(consumer.receiveNoWait()); + + addressControl.replay(queue, "i=5"); + TextMessage message = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(5, message.getIntProperty("i")); + Assert.assertEquals("message 5", message.getText()); + Assert.assertNull(consumer.receiveNoWait()); + } + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -607,6 +722,7 @@ public class AddressControlTest extends ManagementTestBase { // Private ------------------------------------------------------- + // Inner classes ------------------------------------------------- } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java index c512a44afa..738b47dc80 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java @@ -178,6 +178,16 @@ public class AddressControlUsingCoreTest extends AddressControlTest { return (long) proxy.invokeOperation("purge"); } + @Override + public void replay(String startScan, String endScan, String target, String filter) throws Exception { + proxy.invokeOperation("replay", startScan, endScan, target, filter); + } + + @Override + public void replay(String target, String filter) throws Exception { + proxy.invokeOperation("replay", target, filter); + } + @Override public String sendMessage(Map headers, int type, From fdc0cc591c9ab9dea22a9b65cef6c53ea0263222 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 21 Oct 2021 13:19:39 -0400 Subject: [PATCH 2/2] ARTEMIS-3523 Small tweaks into AddressControl delegation --- .../artemis/core/server/replay/ReplayManager.java | 8 +++++++- .../integration/management/AddressControlTest.java | 13 ++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java index a13609ec1c..1906f4712e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java @@ -84,7 +84,7 @@ public class ReplayManager { } } - private void actualReplay(Date start, Date end, String sourceAddress, String targetAddress, String filterStr) throws Exception { + private void actualReplay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception { if (logger.isDebugEnabled()) { logger.debug("Replay::" + sourceAddress); } @@ -92,6 +92,12 @@ public class ReplayManager { throw new NullPointerException("sourceAddress"); } + if (targetAddressParameter == null || targetAddressParameter.trim().isEmpty()) { + targetAddressParameter = sourceAddress; + } + + final String targetAddress = targetAddressParameter; + if (journal == null) { // notice this routing plays single threaded. no need for any sort of synchronization here journal = (JournalImpl)server.getStorageManager().getMessageJournal(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index a303ad6fbd..21dbb6a74e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -363,7 +363,6 @@ public class AddressControlTest extends ManagementTestBase { ClientSessionFactory sf2 = createSessionFactory(locator2); session = sf2.createSession(false, true, false); - session.createQueue(new QueueConfiguration(address)); Assert.assertEquals(1024, addressControl.getNumberOfBytesPerPage()); } @@ -601,12 +600,11 @@ public class AddressControlTest extends ManagementTestBase { } private void testReplaySimple(boolean useDate) throws Exception { - SimpleString address = RandomUtil.randomSimpleString(); - AddressControl addressControl = createManagementControl(address); String queue = "testQueue" + RandomUtil.randomString(); server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST)); server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue)); + AddressControl addressControl = createManagementControl(SimpleString.toSimpleString(queue)); ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); try (Connection connection = factory.createConnection()) { @@ -661,13 +659,13 @@ public class AddressControlTest extends ManagementTestBase { @Test public void testReplayFilter() throws Exception { - SimpleString address = RandomUtil.randomSimpleString(); - AddressControl addressControl = createManagementControl(address); String queue = "testQueue" + RandomUtil.randomString(); server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST)); server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue)); + AddressControl addressControl = createManagementControl(SimpleString.toSimpleString(queue)); + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); try (Connection connection = factory.createConnection()) { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -704,8 +702,9 @@ public class AddressControlTest extends ManagementTestBase { public void setUp() throws Exception { super.setUp(); - Configuration config = createDefaultInVMConfig().setJMXManagementEnabled(true); - server = createServer(false, config); + Configuration config = createDefaultNettyConfig().setJMXManagementEnabled(true); + config.setJournalRetentionDirectory(config.getJournalDirectory() + "_ret"); // needed for replay tests + server = createServer(true, config); server.setMBeanServer(mbeanServer); server.start();