This commit is contained in:
Clebert Suconic 2021-10-21 13:21:24 -04:00
commit 84ffa9b37a
5 changed files with 163 additions and 4 deletions

View File

@ -230,4 +230,13 @@ public interface AddressControl {
@Operation(desc = "Purges the queues bound to this address. Returns the total number of messages purged.", impact = MBeanOperationInfo.ACTION)
long purge() throws Exception;
@Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
void replay(@Parameter(name = "target", desc = "Where the replay data should be sent") String target,
@Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
@Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
void replay(@Parameter(name = "startScanDate", desc = "Start date where we will start scanning for journals to replay. Format YYYYMMDDHHMMSS") String startScan,
@Parameter(name = "endScanDate", desc = "Finish date where we will stop scannning for journals to replay. Format YYYYMMDDHHMMSS") String endScan,
@Parameter(name = "target", desc = "Where the replay data should be sent") String target,
@Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
}

View File

@ -19,8 +19,10 @@ package org.apache.activemq.artemis.core.management.impl;
import javax.json.JsonArrayBuilder;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -48,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.replay.ReplayManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.utils.JsonLoader;
@ -574,6 +577,22 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return totalMsgs;
}
@Override
public void replay(String target, String filter) throws Exception {
server.replay(null, null, this.getAddress(), target, filter);
}
@Override
public void replay(String startScan, String endScan, String target, String filter) throws Exception {
SimpleDateFormat format = ReplayManager.newRetentionSimpleDateFormat();
Date startScanDate = format.parse(startScan);
Date endScanDate = format.parse(endScan);
server.replay(startScanDate, endScanDate, this.getAddress(), target, filter);
}
// Private -------------------------------------------------------
private long getMessageCount(final DurabilityType durability) {

View File

@ -84,7 +84,7 @@ public class ReplayManager {
}
}
private void actualReplay(Date start, Date end, String sourceAddress, String targetAddress, String filterStr) throws Exception {
private void actualReplay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Replay::" + sourceAddress);
}
@ -92,6 +92,12 @@ public class ReplayManager {
throw new NullPointerException("sourceAddress");
}
if (targetAddressParameter == null || targetAddressParameter.trim().isEmpty()) {
targetAddressParameter = sourceAddress;
}
final String targetAddress = targetAddressParameter;
if (journal == null) {
// notice this routing plays single threaded. no need for any sort of synchronization here
journal = (JournalImpl)server.getStorageManager().getMessageJournal();

View File

@ -16,10 +16,18 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.json.JsonArray;
import javax.json.JsonString;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -47,8 +55,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
@ -353,7 +363,6 @@ public class AddressControlTest extends ManagementTestBase {
ClientSessionFactory sf2 = createSessionFactory(locator2);
session = sf2.createSession(false, true, false);
session.createQueue(new QueueConfiguration(address));
Assert.assertEquals(1024, addressControl.getNumberOfBytesPerPage());
}
@ -580,6 +589,110 @@ public class AddressControlTest extends ManagementTestBase {
Wait.assertEquals(0L, () -> addressControl.getMessageCount(), 2000, 100);
}
@Test
public void testReplayWithoutDate() throws Exception {
testReplaySimple(false);
}
@Test
public void testReplayWithDate() throws Exception {
testReplaySimple(true);
}
private void testReplaySimple(boolean useDate) throws Exception {
String queue = "testQueue" + RandomUtil.randomString();
server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
AddressControl addressControl = createManagementControl(SimpleString.toSimpleString(queue));
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue(queue);
MessageProducer producer = session.createProducer(jmsQueue);
producer.send(session.createTextMessage("before"));
connection.start();
MessageConsumer consumer = session.createConsumer(jmsQueue);
Assert.assertNotNull(consumer.receive(5000));
Assert.assertNull(consumer.receiveNoWait());
addressControl.replay(queue, null);
Assert.assertNotNull(consumer.receive(5000));
Assert.assertNull(consumer.receiveNoWait());
if (useDate) {
addressControl.replay("dontexist", null); // just to force a move next file, and copy stuff into place
SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
Thread.sleep(1000); // waiting a second just to have the timestamp change
String dateEnd = format.format(new Date());
Thread.sleep(1000); // waiting a second just to have the timestamp change
String dateStart = "19800101000000";
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage("after receiving"));
}
for (int i = 0; i < 100; i++) {
Assert.assertNotNull(consumer.receive());
}
Assert.assertNull(consumer.receiveNoWait());
addressControl.replay(dateStart, dateEnd, queue, null);
for (int i = 0; i < 2; i++) { // replay of the replay will contain two messages
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("before", message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
} else {
addressControl.replay(queue, null);
// replay of the replay, there will be two messages
for (int i = 0; i < 2; i++) {
Assert.assertNotNull(consumer.receive(5000));
}
Assert.assertNull(consumer.receiveNoWait());
}
}
}
@Test
public void testReplayFilter() throws Exception {
String queue = "testQueue" + RandomUtil.randomString();
server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
AddressControl addressControl = createManagementControl(SimpleString.toSimpleString(queue));
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue(queue);
MessageProducer producer = session.createProducer(jmsQueue);
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("message " + i);
message.setIntProperty("i", i);
producer.send(message);
}
connection.start();
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 10; i++) {
Assert.assertNotNull(consumer.receive(5000));
}
Assert.assertNull(consumer.receiveNoWait());
addressControl.replay(queue, "i=5");
TextMessage message = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(5, message.getIntProperty("i"));
Assert.assertEquals("message 5", message.getText());
Assert.assertNull(consumer.receiveNoWait());
}
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@ -589,8 +702,9 @@ public class AddressControlTest extends ManagementTestBase {
public void setUp() throws Exception {
super.setUp();
Configuration config = createDefaultInVMConfig().setJMXManagementEnabled(true);
server = createServer(false, config);
Configuration config = createDefaultNettyConfig().setJMXManagementEnabled(true);
config.setJournalRetentionDirectory(config.getJournalDirectory() + "_ret"); // needed for replay tests
server = createServer(true, config);
server.setMBeanServer(mbeanServer);
server.start();
@ -607,6 +721,7 @@ public class AddressControlTest extends ManagementTestBase {
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}

View File

@ -178,6 +178,16 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
return (long) proxy.invokeOperation("purge");
}
@Override
public void replay(String startScan, String endScan, String target, String filter) throws Exception {
proxy.invokeOperation("replay", startScan, endScan, target, filter);
}
@Override
public void replay(String target, String filter) throws Exception {
proxy.invokeOperation("replay", target, filter);
}
@Override
public String sendMessage(Map<String, String> headers,
int type,