mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-21 01:15:50 +00:00
ARTEMIS-3523: Created delegated methods replay in addressControl
This commit is contained in:
parent
daf537a87a
commit
ebf8adc72b
@ -230,4 +230,13 @@ public interface AddressControl {
|
||||
@Operation(desc = "Purges the queues bound to this address. Returns the total number of messages purged.", impact = MBeanOperationInfo.ACTION)
|
||||
long purge() throws Exception;
|
||||
|
||||
@Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
|
||||
void replay(@Parameter(name = "target", desc = "Where the replay data should be sent") String target,
|
||||
@Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
|
||||
|
||||
@Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
|
||||
void replay(@Parameter(name = "startScanDate", desc = "Start date where we will start scanning for journals to replay. Format YYYYMMDDHHMMSS") String startScan,
|
||||
@Parameter(name = "endScanDate", desc = "Finish date where we will stop scannning for journals to replay. Format YYYYMMDDHHMMSS") String endScan,
|
||||
@Parameter(name = "target", desc = "Where the replay data should be sent") String target,
|
||||
@Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
|
||||
}
|
||||
|
@ -19,8 +19,10 @@ package org.apache.activemq.artemis.core.management.impl;
|
||||
import javax.json.JsonArrayBuilder;
|
||||
import javax.management.MBeanAttributeInfo;
|
||||
import javax.management.MBeanOperationInfo;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -48,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.replay.ReplayManager;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.logs.AuditLogger;
|
||||
import org.apache.activemq.artemis.utils.JsonLoader;
|
||||
@ -574,6 +577,22 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
|
||||
return totalMsgs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replay(String target, String filter) throws Exception {
|
||||
server.replay(null, null, this.getAddress(), target, filter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replay(String startScan, String endScan, String target, String filter) throws Exception {
|
||||
|
||||
SimpleDateFormat format = ReplayManager.newRetentionSimpleDateFormat();
|
||||
|
||||
Date startScanDate = format.parse(startScan);
|
||||
Date endScanDate = format.parse(endScan);
|
||||
|
||||
server.replay(startScanDate, endScanDate, this.getAddress(), target, filter);
|
||||
}
|
||||
|
||||
// Private -------------------------------------------------------
|
||||
|
||||
private long getMessageCount(final DurabilityType durability) {
|
||||
|
@ -16,10 +16,18 @@
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.management;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonString;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
@ -47,8 +55,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.Base64;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
@ -580,6 +590,111 @@ public class AddressControlTest extends ManagementTestBase {
|
||||
Wait.assertEquals(0L, () -> addressControl.getMessageCount(), 2000, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayWithoutDate() throws Exception {
|
||||
testReplaySimple(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayWithDate() throws Exception {
|
||||
testReplaySimple(true);
|
||||
}
|
||||
|
||||
private void testReplaySimple(boolean useDate) throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
|
||||
AddressControl addressControl = createManagementControl(address);
|
||||
String queue = "testQueue" + RandomUtil.randomString();
|
||||
server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
|
||||
server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
|
||||
|
||||
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue jmsQueue = session.createQueue(queue);
|
||||
MessageProducer producer = session.createProducer(jmsQueue);
|
||||
producer.send(session.createTextMessage("before"));
|
||||
|
||||
connection.start();
|
||||
MessageConsumer consumer = session.createConsumer(jmsQueue);
|
||||
Assert.assertNotNull(consumer.receive(5000));
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
|
||||
addressControl.replay(queue, null);
|
||||
Assert.assertNotNull(consumer.receive(5000));
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
|
||||
if (useDate) {
|
||||
addressControl.replay("dontexist", null); // just to force a move next file, and copy stuff into place
|
||||
SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
|
||||
Thread.sleep(1000); // waiting a second just to have the timestamp change
|
||||
String dateEnd = format.format(new Date());
|
||||
Thread.sleep(1000); // waiting a second just to have the timestamp change
|
||||
String dateStart = "19800101000000";
|
||||
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
producer.send(session.createTextMessage("after receiving"));
|
||||
}
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Assert.assertNotNull(consumer.receive());
|
||||
}
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
addressControl.replay(dateStart, dateEnd, queue, null);
|
||||
for (int i = 0; i < 2; i++) { // replay of the replay will contain two messages
|
||||
TextMessage message = (TextMessage) consumer.receive(5000);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals("before", message.getText());
|
||||
}
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
} else {
|
||||
addressControl.replay(queue, null);
|
||||
|
||||
// replay of the replay, there will be two messages
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Assert.assertNotNull(consumer.receive(5000));
|
||||
}
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayFilter() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
|
||||
AddressControl addressControl = createManagementControl(address);
|
||||
String queue = "testQueue" + RandomUtil.randomString();
|
||||
server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
|
||||
server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
|
||||
|
||||
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue jmsQueue = session.createQueue(queue);
|
||||
MessageProducer producer = session.createProducer(jmsQueue);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
TextMessage message = session.createTextMessage("message " + i);
|
||||
message.setIntProperty("i", i);
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
connection.start();
|
||||
MessageConsumer consumer = session.createConsumer(jmsQueue);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Assert.assertNotNull(consumer.receive(5000));
|
||||
}
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
|
||||
addressControl.replay(queue, "i=5");
|
||||
TextMessage message = (TextMessage)consumer.receive(5000);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals(5, message.getIntProperty("i"));
|
||||
Assert.assertEquals("message 5", message.getText());
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
}
|
||||
}
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
||||
// Protected -----------------------------------------------------
|
||||
@ -607,6 +722,7 @@ public class AddressControlTest extends ManagementTestBase {
|
||||
|
||||
// Private -------------------------------------------------------
|
||||
|
||||
|
||||
// Inner classes -------------------------------------------------
|
||||
|
||||
}
|
||||
|
@ -178,6 +178,16 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
|
||||
return (long) proxy.invokeOperation("purge");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replay(String startScan, String endScan, String target, String filter) throws Exception {
|
||||
proxy.invokeOperation("replay", startScan, endScan, target, filter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replay(String target, String filter) throws Exception {
|
||||
proxy.invokeOperation("replay", target, filter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String sendMessage(Map<String, String> headers,
|
||||
int type,
|
||||
|
Loading…
x
Reference in New Issue
Block a user