ARTEMIS-3436 Journal Replay operation

This commit is contained in:
Clebert Suconic 2021-08-14 12:12:27 -04:00
parent edf688e706
commit 4d2fd89882
19 changed files with 1225 additions and 192 deletions

View File

@ -1946,5 +1946,17 @@ public interface ActiveMQServerControl {
@Operation(desc = "forces the broker to reload its configuration file", impact = MBeanOperationInfo.ACTION)
void reloadConfigurationFile() throws Exception;
@Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
void replay(@Parameter(name = "address", desc = "Name of the address to replay") String address,
@Parameter(name = "target", desc = "Where the replay data should be sent") String target,
@Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
@Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
void replay(@Parameter(name = "startScanDate", desc = "Start date where we will start scanning for journals to replay. Format YYYYMMDDHHMMSS") String startScan,
@Parameter(name = "endScanDate", desc = "Finish date where we will stop scannning for journals to replay. Format YYYYMMDDHHMMSS") String endScan,
@Parameter(name = "address", desc = "Name of the address to replay") String address,
@Parameter(name = "target", desc = "Where the replay data should be sent") String target,
@Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
@ -324,6 +325,10 @@ public interface Journal extends ActiveMQComponent {
*/
void forceMoveNextFile() throws Exception;
default void forceBackup(int timeout, TimeUnit unit) throws Exception {
}
/**
* Returns the {@link JournalFile}s in use.
*

View File

@ -2891,6 +2891,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
@Override
public void forceBackup(int timeout, TimeUnit unit) throws Exception {
journalLock.writeLock().lock();
try {
moveNextFile(true, true);
} finally {
journalLock.writeLock().unlock();
}
CountDownLatch latch = new CountDownLatch(1);
compactorExecutor.execute(latch::countDown);
latch.await(timeout, unit);
}
// ActiveMQComponent implementation
// ---------------------------------------------------

View File

@ -26,57 +26,69 @@ public interface JournalReaderCallback {
default void done() {
}
void onReadAddRecord(RecordInfo info) throws Exception;
default void onReadAddRecord(RecordInfo info) throws Exception {
}
/**
* @param recordInfo
* @throws Exception
*/
void onReadUpdateRecord(RecordInfo recordInfo) throws Exception;
default void onReadUpdateRecord(RecordInfo recordInfo) throws Exception {
}
/**
* @param recordID
*/
void onReadDeleteRecord(long recordID) throws Exception;
default void onReadDeleteRecord(long recordID) throws Exception {
}
/**
* @param transactionID
* @param recordInfo
* @throws Exception
*/
void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
default void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception {
}
/**
* @param transactionID
* @param recordInfo
* @throws Exception
*/
void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
default void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception {
}
/**
* @param transactionID
* @param recordInfo
*/
void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
default void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception {
}
/**
* @param transactionID
* @param extraData
* @param numberOfRecords
*/
void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
default void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception {
}
/**
* @param transactionID
* @param numberOfRecords
*/
void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception;
default void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception {
}
/**
* @param transactionID
*/
void onReadRollbackRecord(long transactionID) throws Exception;
default void onReadRollbackRecord(long transactionID) throws Exception {
}
void markAsDataFile(JournalFile file);
default void markAsDataFile(JournalFile file) {
}
}

View File

@ -32,6 +32,7 @@ import javax.management.NotificationListener;
import javax.transaction.xa.Xid;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -115,6 +116,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
import org.apache.activemq.artemis.core.server.replay.ReplayManager;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
@ -4438,5 +4440,21 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
public void reloadConfigurationFile() throws Exception {
server.reloadConfigurationFile();
}
@Override
public void replay(String address, String target, String filter) throws Exception {
server.replay(null, null, address, target, filter);
}
@Override
public void replay(String startScan, String endScan, String address, String target, String filter) throws Exception {
SimpleDateFormat format = ReplayManager.newRetentionSimpleDateFormat();
Date startScanDate = format.parse(startScan);
Date endScanDate = format.parse(endScan);
server.replay(startScanDate, endScanDate, address, target, filter);
}
}

View File

@ -75,6 +75,7 @@ import org.jboss.logging.Logger;
public class JournalStorageManager extends AbstractJournalStorageManager {
private static final Logger logger = Logger.getLogger(JournalStorageManager.class);
public static final String ACTIVEMQ_DATA = "activemq-data";
protected SequentialFileFactory journalFF;
@ -217,7 +218,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
protected Journal createMessageJournal(Configuration config,
IOCriticalErrorListener criticalErrorListener,
int fileSize) {
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles());
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, ACTIVEMQ_DATA, "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles());
}
// Life Cycle Handlers

View File

@ -515,4 +515,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229238, value = "No target to redirect the connection")
ActiveMQRedirectedException cannotRedirect();
@Message(id = 229239, value = "There is not retention configured. In order to use the replay method you must specify journal-retention-directory element on the broker.xml")
IllegalArgumentException noRetention();
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server;
import javax.management.MBeanServer;
import java.util.Collection;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -194,6 +195,8 @@ public interface ActiveMQServer extends ServiceComponent {
*/
void registerActivationFailureListener(ActivationFailureListener listener);
void replay(Date start, Date end, String address, String target, String filter) throws Exception;
/**
* Remove a previously registered failure listener
*

View File

@ -175,6 +175,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugi
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancerManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
import org.apache.activemq.artemis.core.server.replay.ReplayManager;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@ -266,6 +267,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private volatile ExecutorService ioExecutorPool;
private ReplayManager replayManager;
/**
* This is a thread pool for io tasks only.
* We can't use the same global executor to avoid starvations.
@ -388,9 +391,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
};
// Constructors
// ---------------------------------------------------------------------------------
public ActiveMQServerImpl() {
this(null, null, null);
}
@ -468,6 +468,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.parentServer = parentServer;
this.serviceRegistry = serviceRegistry == null ? new ServiceRegistryImpl() : serviceRegistry;
}
@Override
@ -480,8 +481,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return networkHealthCheck;
}
// life-cycle methods
// ----------------------------------------------------------------
@Override
public void replay(Date start, Date end, String address, String target, String filter) throws Exception {
if (replayManager == null) {
throw ActiveMQMessageBundle.BUNDLE.noRetention();
}
replayManager.replay(start, end, address, target, filter);
}
/**
* A Callback for tests
@ -598,6 +605,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
initializeCriticalAnalyzer();
if (configuration.getJournalRetentionLocation() != null) {
this.replayManager = new ReplayManager(this);
} else {
this.replayManager = null;
}
startDate = new Date();
state = SERVER_STATE.STARTING;

View File

@ -0,0 +1,260 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.replay;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.jboss.logging.Logger;
public class ReplayManager {
public static SimpleDateFormat newRetentionSimpleDateFormat() {
return new SimpleDateFormat("yyyyMMddHHmmss");
}
private static final Logger logger = Logger.getLogger(ReplayManager.class);
private final ActiveMQServer server;
private JournalImpl journal;
private final File retentionFolder;
private final SimpleDateFormat dateFormat = newRetentionSimpleDateFormat();
private final AtomicBoolean running = new AtomicBoolean(false);
public ReplayManager(ActiveMQServer server) {
this.server = server;
this.retentionFolder = server.getConfiguration().getJournalRetentionLocation();
}
public void replay(Date start, Date end, String sourceAddress, final String targetAddress, String filter) throws Exception {
if (running.compareAndSet(false, true)) {
try {
actualReplay(start, end, sourceAddress, targetAddress, filter);
} catch (Exception e) {
logger.warn(e.getMessage());
throw e;
} finally {
running.set(false);
}
} else {
throw new RuntimeException("Replay manager is currently busy with another operation");
}
}
private void actualReplay(Date start, Date end, String sourceAddress, String targetAddress, String filterStr) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Replay::" + sourceAddress);
}
if (sourceAddress == null) {
throw new NullPointerException("sourceAddress");
}
if (journal == null) {
// notice this routing plays single threaded. no need for any sort of synchronization here
journal = (JournalImpl)server.getStorageManager().getMessageJournal();
}
Filter filter;
if (filterStr != null) {
filter = FilterImpl.createFilter(filterStr);
} else {
filter = null;
}
journal.forceBackup(1, TimeUnit.MINUTES);
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(retentionFolder, null, 1);
// Will use only default values. The load function should adapt to anything different
JournalImpl messagesJournal = new JournalImpl(server.getConfiguration().getJournalFileSize(), server.getConfiguration().getJournalMinFiles(), server.getConfiguration().getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
List<JournalFile> files = messagesJournal.orderFiles();
RoutingContext context = new RoutingContextImpl(null);
HashMap<Long, LinkedHashSet<JournalFile>> largeMessageLocations = new HashMap<>();
for (JournalFile file : files) {
if (start != null || end != null) {
long fileEpochTime = journal.getDatePortionMillis(file.getFile().getFileName());
if (logger.isDebugEnabled()) {
String datePortion = journal.getDatePortion(file.getFile().getFileName());
logger.debug("Evaluating replay for file " + file.getFile().getFileName() + ", datePortion=" + datePortion + "\n" +
"\tInterval evaluated: start(" + start + ") --- file(" + new Date(fileEpochTime) + ") --- end(" + end + ")\n" +
"\tepoch times: start(" + start.getTime() + ") --- file(" + fileEpochTime + ") + end(" + end.getTime() + ")");
}
if (start != null && fileEpochTime < start.getTime()) {
if (logger.isDebugEnabled()) {
logger.debug("File " + file.getFile().getFileName() + " being skipped on start comparison");
}
continue;
}
if (end != null && fileEpochTime > end.getTime()) {
if (logger.isDebugEnabled()) {
logger.debug("File " + file.getFile().getFileName() + " being skipped on end comparison");
}
continue;
}
}
JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() {
@Override
public void onReadEventRecord(RecordInfo info) throws Exception {
switch (info.getUserRecordType()) {
case JournalRecordIds.ADD_MESSAGE_BODY:
LinkedHashSet<JournalFile> files = largeMessageLocations.get(info.id);
if (files == null) {
files = new LinkedHashSet<>();
largeMessageLocations.put(info.id, files);
}
files.add(file);
break;
default:
onReadAddRecord(info);
}
}
@Override
public void onReadAddRecord(RecordInfo info) throws Exception {
if (info.getUserRecordType() == JournalRecordIds.ADD_LARGE_MESSAGE) {
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(info.data);
LargeServerMessage message = new LargeServerMessageImpl(server.getStorageManager());
LargeMessagePersister.getInstance().decode(buffer, message, null);
route(filter, context, messagesFF, message.toMessage(), sourceAddress, targetAddress, largeMessageLocations);
} else if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(info.data);
Message message = MessagePersister.getInstance().decode(buffer, null, null, server.getStorageManager());
route(filter, context, messagesFF, message, sourceAddress, targetAddress, largeMessageLocations);
}
}
@Override
public void onReadUpdateRecord(RecordInfo info) throws Exception {
onReadAddRecord(info);
}
@Override
public void onReadAddRecordTX(long transactionID, RecordInfo info) throws Exception {
onReadAddRecord(info);
}
@Override
public void onReadUpdateRecordTX(long transactionID, RecordInfo info) throws Exception {
onReadUpdateRecord(info);
}
}, null, false, null);
}
}
private boolean messageMatch(Filter filter, Message message, String sourceAddress, String targetAddress) {
if (message.getAddress() != null && message.getAddress().equals(sourceAddress)) {
if (filter != null) {
if (!filter.match(message)) {
return false;
}
}
if (targetAddress != null && !targetAddress.equals(sourceAddress)) {
message.setAddress(targetAddress);
message.reencode();
}
return true;
} else {
return false;
}
}
private void route(Filter filter, RoutingContext context, SequentialFileFactory messagesFF, Message message, String sourceAddress, String targetAddress, HashMap<Long, LinkedHashSet<JournalFile>> filesMap) throws Exception {
if (messageMatch(filter, message, sourceAddress, targetAddress)) {
final long originalMessageID = message.getMessageID();
message.setMessageID(server.getStorageManager().generateID());
if (message.isLargeMessage()) {
readLargeMessageBody(messagesFF, message, filesMap, originalMessageID);
}
if (targetAddress != null && !sourceAddress.equals(targetAddress)) {
message.setAddress(targetAddress);
message.reencode();
}
server.getPostOffice().route(message, context, false, false, null);
context.clear();
} else {
if (message.isLargeMessage()) {
filesMap.remove(message.getMessageID());
}
}
}
private void readLargeMessageBody(SequentialFileFactory messagesFF,
Message message,
HashMap<Long, LinkedHashSet<JournalFile>> filesMap,
long originalMessageID) throws Exception {
long newMessageID = message.getMessageID();
SequentialFile largeMessageFile = server.getStorageManager().createFileForLargeMessage(newMessageID, true);
largeMessageFile.open();
LinkedHashSet<JournalFile> files = filesMap.get(originalMessageID);
if (files != null) {
for (JournalFile file : files) {
JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() {
@Override
public void onReadEventRecord(RecordInfo info) throws Exception {
if (info.userRecordType == JournalRecordIds.ADD_MESSAGE_BODY && info.id == originalMessageID) {
server.getStorageManager().addBytesToLargeMessage(largeMessageFile, newMessageID, info.data);
}
}
});
}
}
largeMessageFile.close();
}
}

View File

@ -19,16 +19,20 @@ package org.apache.activemq.artemis.tests.integration.management;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -87,6 +91,7 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.RetryMethod;
@ -4079,6 +4084,108 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
}
}
@Test
public void testReplayWithoutDate() throws Exception {
testReplaySimple(false);
}
@Test
public void testReplayWithDate() throws Exception {
testReplaySimple(true);
}
private void testReplaySimple(boolean useDate) throws Exception {
ActiveMQServerControl serverControl = createManagementControl();
String queue = "testQueue" + RandomUtil.randomString();
server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue(queue);
MessageProducer producer = session.createProducer(jmsQueue);
producer.send(session.createTextMessage("before"));
connection.start();
MessageConsumer consumer = session.createConsumer(jmsQueue);
Assert.assertNotNull(consumer.receive(5000));
Assert.assertNull(consumer.receiveNoWait());
serverControl.replay(queue, queue, null);
Assert.assertNotNull(consumer.receive(5000));
Assert.assertNull(consumer.receiveNoWait());
if (useDate) {
serverControl.replay("dontexist", "dontexist", null); // just to force a move next file, and copy stuff into place
SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
Thread.sleep(1000); // waiting a second just to have the timestamp change
String dateEnd = format.format(new Date());
Thread.sleep(1000); // waiting a second just to have the timestamp change
String dateStart = "19800101000000";
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage("after receiving"));
}
for (int i = 0; i < 100; i++) {
Assert.assertNotNull(consumer.receive());
}
Assert.assertNull(consumer.receiveNoWait());
serverControl.replay(dateStart, dateEnd, queue, queue, null);
for (int i = 0; i < 2; i++) { // replay of the replay will contain two messages
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("before", message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
} else {
serverControl.replay(queue, queue, null);
// replay of the replay, there will be two messages
for (int i = 0; i < 2; i++) {
Assert.assertNotNull(consumer.receive(5000));
}
Assert.assertNull(consumer.receiveNoWait());
}
}
}
@Test
public void testReplayFilter() throws Exception {
ActiveMQServerControl serverControl = createManagementControl();
String queue = "testQueue" + RandomUtil.randomString();
server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue(queue);
MessageProducer producer = session.createProducer(jmsQueue);
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("message " + i);
message.setIntProperty("i", i);
producer.send(message);
}
connection.start();
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 10; i++) {
Assert.assertNotNull(consumer.receive(5000));
}
Assert.assertNull(consumer.receiveNoWait());
serverControl.replay(queue, queue, "i=5");
TextMessage message = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(5, message.getIntProperty("i"));
Assert.assertEquals("message 5", message.getText());
Assert.assertNull(consumer.receiveNoWait());
}
}
@Test
public void testBrokerConnections() throws Exception {
@ -4130,7 +4237,6 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
}
}
protected void scaleDown(ScaleDownHandler handler) throws Exception {
SimpleString address = new SimpleString("testQueue");
HashMap<String, Object> params = new HashMap<>();
@ -4200,6 +4306,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
securityConfiguration.addRole("myUser", "guest");
securityConfiguration.setDefaultUser("guest");
ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), securityConfiguration);
conf.setJournalRetentionDirectory(conf.getJournalDirectory() + "_ret"); // needed for replay tests
server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, securityManager, true));
server.start();

View File

@ -1645,6 +1645,20 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Parameter(name = "Page Size") int pageSize) throws Exception {
return (String) proxy.invokeOperation("listQueues", options, page, pageSize);
}
@Override
public void replay(String address, String target, String filter) throws Exception {
proxy.invokeOperation("replay", address, target, filter);
}
@Override
public void replay(String startScan,
String endScan,
String address,
String target,
String filter) throws Exception {
proxy.invokeOperation("replay", startScan, endScan, address, target, filter);
}
};
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.retention;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
public class ReplayTest extends ActiveMQTestBase {
ActiveMQServer server;
@Override
public void setUp() throws Exception {
super.setUp();
server = addServer(createServer(true, true));
server.getConfiguration().setJournalRetentionDirectory(getJournalDir() + "retention");
server.getConfiguration().setJournalFileSize(100 * 1024);
server.start();
server.addAddressInfo(new AddressInfo("t1").addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration("t1").setAddress("t1").setRoutingType(RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo("t2").addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration("t2").setAddress("t2").setRoutingType(RoutingType.ANYCAST));
}
@Test
public void testReplayAMQP() throws Exception {
testReplay("AMQP", 10);
}
@Test
public void testReplayCore() throws Exception {
testReplay("CORE", 10);
}
public void testReplay(String protocol, int size) throws Exception {
StringBuffer buffer = new StringBuffer();
buffer.append(RandomUtil.randomString());
for (int i = 0; i < size; i++) {
buffer.append("*");
}
ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("t1");
MessageProducer producer = session.createProducer(null);
producer.send(queue, session.createTextMessage(buffer.toString()));
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Assert.assertNotNull(consumer.receive(5000));
Assert.assertNull(consumer.receiveNoWait());
server.replay(null, null, "t1", "t2", null);
Queue t2 = session.createQueue("t2");
MessageConsumer consumert2 = session.createConsumer(t2);
TextMessage receivedMessage = (TextMessage) consumert2.receive(5000);
Assert.assertNotNull(receivedMessage);
Assert.assertEquals(buffer.toString(), receivedMessage.getText());
Assert.assertNull(consumert2.receiveNoWait());
server.replay(null, null, "t2", "t1", null);
receivedMessage = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(receivedMessage);
Assert.assertNull(consumer.receiveNoWait());
// invalid filter.. nothing should be re played
server.replay(null, null, "t1", "t1", "foo='foo'");
Assert.assertNull(consumer.receiveNoWait());
}
}
@Test
public void testReplayLargeAMQP() throws Exception {
testReplay("AMQP", 500 * 1024);
}
@Test
public void testReplayLargeCore() throws Exception {
testReplay("CORE", 500 * 1024);
}
}

View File

@ -1159,6 +1159,28 @@
<configuration>${basedir}/target/classes/servers/bridgeTransfer/serverB</configuration>
</configuration>
</execution>
<!-- Used on TestRetention -->
<execution>
<phase>test-compile</phase>
<id>create-replay</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>artemis</user>
<password>artemis</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>false</noWeb>
<instance>${basedir}/target/replay/replay</instance>
<configuration>${basedir}/target/classes/servers/replay/replay</configuration>
<args>
<!-- this is needed to run the server remotely -->
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>

View File

@ -0,0 +1,245 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>replay</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-retention-directory period="7" unit="DAYS">./data/retention</journal-retention-directory>
<!--
if you want to retain your journal uncomment this following configuration.
This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity.
it is recommended to use a separate storage unit from the journal for performance considerations.
<journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory>>
-->
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size>
-->
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
default: 102400, -1 would mean to disable large mesasge control -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
<address name="RetentionTest">
<anycast>
<queue name="RetentionTest"/>
</anycast>
</address>
</addresses>
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="true"/>
<property key="LOG_CONNECTION_EVENTS" value="true"/>
<property key="LOG_SESSION_EVENTS" value="true"/>
<property key="LOG_CONSUMER_EVENTS" value="true"/>
<property key="LOG_DELIVERING_EVENTS" value="true"/>
<property key="LOG_SENDING_EVENTS" value="true"/>
<property key="LOG_INTERNAL_EVENTS" value="true"/>
</broker-plugin>
</broker-plugins>
-->
</core>
</configuration>

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<management-context xmlns="http://activemq.org/schema">
<connector connector-port="1099"/>
<authorisation>
<whitelist>
<entry domain="hawtio"/>
</whitelist>
<default-access>
<access method="list*" roles="amq"/>
<access method="get*" roles="amq"/>
<access method="is*" roles="amq"/>
<access method="set*" roles="amq"/>
<access method="*" roles="amq"/>
</default-access>
<role-access>
<match domain="org.apache.activemq.artemis">
<access method="list*" roles="amq"/>
<access method="get*" roles="amq"/>
<access method="is*" roles="amq"/>
<access method="set*" roles="amq"/>
<!-- Note count and browse are need to access the browse tab in the console-->
<access method="browse*" roles="amq"/>
<access method="count*" roles="amq"/>
<access method="*" roles="amq"/>
</match>
<!--example of how to configure a specific object-->
<!--<match domain="org.apache.activemq.artemis" key="subcomponent=queues">
<access method="list*" roles="view,update,amq"/>
<access method="get*" roles="view,update,amq"/>
<access method="is*" roles="view,update,amq"/>
<access method="set*" roles="update,amq"/>
<access method="*" roles="amq"/>
</match>-->
</role-access>
</authorisation>
</management-context>

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.smoke.common;
import javax.management.MBeanServerInvocationHandler;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
@ -26,6 +27,8 @@ import java.net.MalformedURLException;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.cli.commands.Stop;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@ -99,6 +102,32 @@ public class SmokeTestBase extends ActiveMQTestBase {
return getJmxConnector(JMX_SERVER_HOSTNAME, JMX_SERVER_PORT);
}
protected static JMXConnector newJMXFactory(String uri) throws Throwable {
return JMXConnectorFactory.connect(new JMXServiceURL(uri));
}
protected static ActiveMQServerControl getServerControl(String uri,
ObjectNameBuilder builder,
long timeout) throws Throwable {
long expireLoop = System.currentTimeMillis() + timeout;
Throwable lastException = null;
do {
try {
JMXConnector connector = newJMXFactory(uri);
ActiveMQServerControl serverControl = MBeanServerInvocationHandler.newProxyInstance(connector.getMBeanServerConnection(), builder.getActiveMQServerObjectName(), ActiveMQServerControl.class, false);
serverControl.isActive(); // making one call to make sure it's working
return serverControl;
} catch (Throwable e) {
lastException = e;
Thread.sleep(500);
}
}
while (expireLoop > System.currentTimeMillis());
throw lastException;
}
protected static JMXConnector getJmxConnector(String hostname, int port) throws MalformedURLException {
// Without this, the RMI server would bind to the default interface IP (the user's local IP mostly)
System.setProperty("java.rmi.server.hostname", hostname);

View File

@ -23,10 +23,6 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServerInvocationHandler;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@ -70,77 +66,40 @@ import org.junit.Test;
*/
public class DNSSwitchTest extends SmokeTestBase {
private static boolean USING_SPAWN = true;
public static final File ETC_HOSTS = new File("/etc/hosts");
private static File ETC_BACKUP;
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_SERVER_PORT_0 = 10099;
private static final int JMX_SERVER_PORT_1 = 10199;
static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
static String backupURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_1 + "/jmxrmi";
static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "live", true);
static ObjectNameBuilder backupNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "backup", true);
// This is a more intrusive option to use with JDK 8
// Instead of using a separate jdk hsots, which is not supported on jdk8,
// with this option set to true we would use the original /etc/hosts
private static boolean USE_ETC_HOSTS = System.getProperty("java.version").startsWith("1.8");
private static final Logger logger = Logger.getLogger(DNSSwitchTest.class);
private static final String SERVER_NAME_0 = "dnsswitch";
private static final String SERVER_NAME_1 = "dnsswitch2";
private static final String SERVER_STANDARD = "standard";
private static final String SERVER_LIVE = "dnsswitch-replicated-main";
private static final String SERVER_LIVE_NORETRYDNS = "dnsswitch-replicated-main-noretrydns";
private static final String SERVER_BACKUP = "dnsswitch-replicated-backup";
private static final String SERVER_LIVE_PING = "dnsswitch-replicated-main-withping";
private static final String SERVER_BACKUP_PING = "dnsswitch-replicated-backup-withping";
// 192.0.2.0 is reserved for documentation (and testing on this case).
private static final String FIRST_IP = "192.0.2.0";
private static final String SECOND_IP = "192.0.3.0";
private static final String THIRD_IP = "192.0.3.0";
private static final String FOURTH_IP = "192.0.4.0";
private static final String INVALID_IP = "203.0.113.0";
private static final String hostsFile = System.getProperty("jdk.net.hosts.file");
static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
static String backupURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_1 + "/jmxrmi";
static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "live", true);
static ObjectNameBuilder backupNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "backup", true);
private static boolean USING_SPAWN = true;
private static File ETC_BACKUP;
// This is a more intrusive option to use with JDK 8
// Instead of using a separate jdk hsots, which is not supported on jdk8,
// with this option set to true we would use the original /etc/hosts
private static boolean USE_ETC_HOSTS = System.getProperty("java.version").startsWith("1.8");
private static String serverLocation;
@Rule
public NetUtilResource netUtilResource = new NetUtilResource();
private static JMXConnector newJMXFactory(String uri) throws Throwable {
return JMXConnectorFactory.connect(new JMXServiceURL(uri));
}
private static ActiveMQServerControl getServerControl(String uri,
ObjectNameBuilder builder,
long timeout) throws Throwable {
long expireLoop = System.currentTimeMillis() + timeout;
Throwable lastException = null;
do {
try {
JMXConnector connector = newJMXFactory(uri);
ActiveMQServerControl serverControl = MBeanServerInvocationHandler.newProxyInstance(connector.getMBeanServerConnection(), builder.getActiveMQServerObjectName(), ActiveMQServerControl.class, false);
serverControl.isActive(); // making one call to make sure it's working
return serverControl;
} catch (Throwable e) {
lastException = e;
Thread.sleep(500);
}
}
while (expireLoop > System.currentTimeMillis());
throw lastException;
}
@BeforeClass
public static void beforeClassMethod() throws Exception {
NetUtil.skipIfNotSupportedOS();
@ -233,7 +192,6 @@ public class DNSSwitchTest extends SmokeTestBase {
Assert.assertTrue("You must send pairs as overrideParameters", overrideParameters.length % 2 == 0);
String javaVersion = System.getProperty("java.version");
File security;
@ -254,25 +212,6 @@ public class DNSSwitchTest extends SmokeTestBase {
securityProperties.store(new FileOutputStream(outputSecurity), "# generated by DNSSwitchTest");
}
private static final String hostsFile = System.getProperty("jdk.net.hosts.file");
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
cleanupData(SERVER_NAME_1);
cleanupData(SERVER_STANDARD);
cleanupData(SERVER_LIVE);
cleanupData(SERVER_LIVE_NORETRYDNS);
cleanupData(SERVER_BACKUP);
cleanupData(SERVER_LIVE_PING);
cleanupData(SERVER_BACKUP_PING);
}
@Test
public void testBackupRedefinition() throws Throwable {
spawnRun(serverLocation, "testBackupRedefinition", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
}
public static void testBackupRedefinition(String[] args) throws Throwable {
NetUtil.netUp(FIRST_IP, "lo:first");
NetUtil.netUp(SECOND_IP, "lo:second");
@ -359,12 +298,6 @@ public class DNSSwitchTest extends SmokeTestBase {
}
@Test
public void testBackupRedefinition2() throws Throwable {
spawnRun(serverLocation, "testBackupRedefinition2", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
}
public static void testBackupRedefinition2(String[] args) throws Throwable {
NetUtil.netUp(FIRST_IP, "lo:first");
NetUtil.netUp(SECOND_IP, "lo:second");
@ -396,7 +329,6 @@ public class DNSSwitchTest extends SmokeTestBase {
Wait.assertTrue(backupControl::isStarted);
Wait.assertTrue(backupControl::isReplicaSync);
saveConf(hostsFile, FIRST_IP, "FIRST", SECOND_IP, "SECOND");
serverBackup.destroyForcibly();
serverBackup = ServerUtil.startServer(args[2], "backup", "tcp://SECOND:61716", 0);
@ -459,12 +391,6 @@ public class DNSSwitchTest extends SmokeTestBase {
}
@Test
public void testBackupRedefinition3() throws Throwable {
spawnRun(serverLocation, "testBackupRedefinition2", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
}
public static void testBackupRedefinition3(String[] args) throws Throwable {
NetUtil.netUp(FIRST_IP, "lo:first");
NetUtil.netUp(SECOND_IP, "lo:second");
@ -496,7 +422,6 @@ public class DNSSwitchTest extends SmokeTestBase {
Wait.assertTrue(backupControl::isStarted);
Wait.assertTrue(backupControl::isReplicaSync);
saveConf(hostsFile, FIRST_IP, "FIRST", SECOND_IP, "SECOND");
serverBackup.destroyForcibly();
serverBackup = ServerUtil.startServer(args[2], "backup", "tcp://SECOND:61716", 0);
@ -556,17 +481,10 @@ public class DNSSwitchTest extends SmokeTestBase {
serverLive.destroyForcibly();
}
}
}
@Test
public void testCantReachBack() throws Throwable {
spawnRun(serverLocation, "testCantReachBack", getServerLocation(SERVER_LIVE_NORETRYDNS), getServerLocation(SERVER_BACKUP));
}
public static void testCantReachBack(String[] args) throws Throwable {
NetUtil.netUp(FIRST_IP, "lo:first");
NetUtil.netUp(SECOND_IP, "lo:second");
@ -604,12 +522,6 @@ public class DNSSwitchTest extends SmokeTestBase {
}
@Test
public void testWithPing() throws Throwable {
spawnRun(serverLocation, "testWithPing", getServerLocation(SERVER_LIVE_PING), getServerLocation(SERVER_BACKUP_PING));
}
public static void testWithPing(String[] args) throws Throwable {
NetUtil.netUp(FIRST_IP, "lo:first");
NetUtil.netUp(SECOND_IP, "lo:second");
@ -640,14 +552,12 @@ public class DNSSwitchTest extends SmokeTestBase {
serverBackup.destroyForcibly();
//Thread.sleep(10_000);
serverLive.destroyForcibly();
serverLive = ServerUtil.startServer(args[1], "live", "tcp://FIRST:61616", 0);
Thread.sleep(1_000);
logger.debug("going to re-enable ping");
// Enable the address just for ping now
saveConf(hostsFile, THIRD_IP, "PINGPLACE");
@ -683,25 +593,16 @@ public class DNSSwitchTest extends SmokeTestBase {
serverLive.destroyForcibly();
}
}
}
@Test
public void testWithoutPingKill() throws Throwable {
spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "1");
}
@Test
public void testWithoutPingRestart() throws Throwable {
spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "0");
}
/**
* arg[0] = constant "testWithoutPing" to be used on reflection through main(String arg[])
* arg[1] = serverlive
* arg[2] = server backup
* arg[3] = 1 | 0 (kill the backup = 1, stop the backup = 0);
*
* @param args
* @throws Throwable
*/
@ -778,12 +679,10 @@ public class DNSSwitchTest extends SmokeTestBase {
serverLive.destroyForcibly();
}
}
}
private static void connectAndWaitBackup() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://FIRST:61616?ha=true");
Assert.assertTrue(connectionFactory.getServerLocator().isHA());
@ -792,12 +691,6 @@ public class DNSSwitchTest extends SmokeTestBase {
connection.close();
}
@Test
public void testFailoverDifferentIPRedefinition() throws Throwable {
spawnRun(serverLocation, "testFailoverDifferentIPRedefinition", serverLocation, getServerLocation(SERVER_NAME_1));
}
public static void testFailoverDifferentIPRedefinition(String[] arg) throws Throwable {
NetUtil.netUp(FIRST_IP);
NetUtil.netUp(SECOND_IP);
@ -850,20 +743,6 @@ public class DNSSwitchTest extends SmokeTestBase {
}
@Test
public void testInitialConnector() throws Throwable {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://test:61616?initialConnectAttempts=500&retryInterval=100&connect-timeout-millis=100");
startServer(SERVER_STANDARD, 0, 30000);
String location = getServerLocation(SERVER_NAME_0);
spawnRun(location, "testInitialConnector");
// If you eed to debug the test, comment out spawnRun, and call the method directly
// you will need to add roperties on the JDK for that
// Add the properties you need
// testInitialConnector("testInitialConnector", location);
}
// called with reflection
public static void testInitialConnector(String... arg) throws Throwable {
saveConf(hostsFile, "192.0.0.3", "test");
@ -903,12 +782,6 @@ public class DNSSwitchTest extends SmokeTestBase {
Assert.assertFalse(connecting.isAlive());
}
// This test is just validating the DNS is not being cached on the separte VM
@Test
public void testSimpleResolution() throws Throwable {
spawnRun(serverLocation, "testSimpleResolution");
}
// called with reflection
public static void testSimpleResolution(String[] arg) throws Throwable {
// This is just to validate the DNS hosts is picking up the right host
@ -920,11 +793,6 @@ public class DNSSwitchTest extends SmokeTestBase {
validateIP("test", "192.0.0.3");
}
@Test
public void testSplitBrainDetection() throws Throwable {
spawnRun(serverLocation, "testSplitBrainDetection", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
}
/**
* arg[0] = constant "testSplitBrainDetection" to be used on reflection through main(String arg[])
* arg[1] = serverlive
@ -975,8 +843,7 @@ public class DNSSwitchTest extends SmokeTestBase {
Wait.assertTrue(() -> !liveControl.isReplicaSync());
logger.debug("Waiting enough to let live spread its topology around");
try (ActiveMQConnectionFactory firstCf = new ActiveMQConnectionFactory("tcp://FIRST:61616?ha=false");
Connection ignored = firstCf.createConnection()) {
try (ActiveMQConnectionFactory firstCf = new ActiveMQConnectionFactory("tcp://FIRST:61616?ha=false"); Connection ignored = firstCf.createConnection()) {
waitForTopology(firstCf.getServerLocator().getTopology(), 60_000, 1, 1);
final Topology topology = firstCf.getServerLocator().getTopology();
final TopologyMemberImpl member = topology.getMember(liveControl.getNodeID());
@ -993,8 +860,7 @@ public class DNSSwitchTest extends SmokeTestBase {
Assert.assertEquals("SECOND", backup.getParams().get("host"));
Assert.assertEquals("61716", backup.getParams().get("port"));
}
try (ActiveMQConnectionFactory secondCf = new ActiveMQConnectionFactory("tcp://SECOND:61716?ha=false");
Connection ignored = secondCf.createConnection()) {
try (ActiveMQConnectionFactory secondCf = new ActiveMQConnectionFactory("tcp://SECOND:61716?ha=false"); Connection ignored = secondCf.createConnection()) {
logger.debug("Waiting until second broker topology has just a single live node");
waitForTopology(secondCf.getServerLocator().getTopology(), 60_000, 1, 0);
final Topology topology = secondCf.getServerLocator().getTopology();
@ -1018,33 +884,6 @@ public class DNSSwitchTest extends SmokeTestBase {
}
}
/**
* it will continue the test on a spwned VM with the properties we need for this test
*/
private void spawnRun(String location, String... args) throws Throwable {
// We have to run part of the test on a separate VM, as we need VM settings to tweak the DNS
String securityProperties = System.getProperty("java.security.properties");
if (securityProperties != null && securityProperties.equals(location + "/etc/zerocache.security")) {
logger.info("No need to spawn a VM, the zerocache is already in place");
System.setProperty("artemis.config.location", location);
USING_SPAWN = false;
main(args);
} else {
securityProperties = "-Djava.security.properties=" + location + "/etc/zerocache.security";
String hostProperties = "-Djdk.net.hosts.file=" + location + "/etc/hosts.conf";
String configLocation = "-Dartemis.config.location=" + location;
String temporaryLocation = "-Djava.io.tmpdir=" + System.getProperty("java.io.tmpdir");
logger.info("if you would like to run without Spawn for debugging purposes, add these properties to your VM arguments on this test: " + securityProperties + " " + hostProperties);
Process p = SpawnedVMSupport.spawnVM(DNSSwitchTest.class.getName(), new String[]{securityProperties, hostProperties, configLocation, temporaryLocation}, args);
addProcess(p);
Assert.assertEquals(1, p.waitFor());
}
}
public static void saveConf(String fileName, String... hostDefinition) throws Exception {
if (USE_ETC_HOSTS) {
recoverETCHosts();
@ -1108,4 +947,109 @@ public class DNSSwitchTest extends SmokeTestBase {
}
}
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
cleanupData(SERVER_NAME_1);
cleanupData(SERVER_STANDARD);
cleanupData(SERVER_LIVE);
cleanupData(SERVER_LIVE_NORETRYDNS);
cleanupData(SERVER_BACKUP);
cleanupData(SERVER_LIVE_PING);
cleanupData(SERVER_BACKUP_PING);
}
@Test
public void testBackupRedefinition() throws Throwable {
spawnRun(serverLocation, "testBackupRedefinition", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
}
@Test
public void testBackupRedefinition2() throws Throwable {
spawnRun(serverLocation, "testBackupRedefinition2", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
}
@Test
public void testBackupRedefinition3() throws Throwable {
spawnRun(serverLocation, "testBackupRedefinition2", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
}
@Test
public void testCantReachBack() throws Throwable {
spawnRun(serverLocation, "testCantReachBack", getServerLocation(SERVER_LIVE_NORETRYDNS), getServerLocation(SERVER_BACKUP));
}
@Test
public void testWithPing() throws Throwable {
spawnRun(serverLocation, "testWithPing", getServerLocation(SERVER_LIVE_PING), getServerLocation(SERVER_BACKUP_PING));
}
@Test
public void testWithoutPingKill() throws Throwable {
spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "1");
}
@Test
public void testWithoutPingRestart() throws Throwable {
spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "0");
}
@Test
public void testFailoverDifferentIPRedefinition() throws Throwable {
spawnRun(serverLocation, "testFailoverDifferentIPRedefinition", serverLocation, getServerLocation(SERVER_NAME_1));
}
@Test
public void testInitialConnector() throws Throwable {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://test:61616?initialConnectAttempts=500&retryInterval=100&connect-timeout-millis=100");
startServer(SERVER_STANDARD, 0, 30000);
String location = getServerLocation(SERVER_NAME_0);
spawnRun(location, "testInitialConnector");
// If you eed to debug the test, comment out spawnRun, and call the method directly
// you will need to add roperties on the JDK for that
// Add the properties you need
// testInitialConnector("testInitialConnector", location);
}
// This test is just validating the DNS is not being cached on the separte VM
@Test
public void testSimpleResolution() throws Throwable {
spawnRun(serverLocation, "testSimpleResolution");
}
@Test
public void testSplitBrainDetection() throws Throwable {
spawnRun(serverLocation, "testSplitBrainDetection", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
}
/**
* it will continue the test on a spwned VM with the properties we need for this test
*/
private void spawnRun(String location, String... args) throws Throwable {
// We have to run part of the test on a separate VM, as we need VM settings to tweak the DNS
String securityProperties = System.getProperty("java.security.properties");
if (securityProperties != null && securityProperties.equals(location + "/etc/zerocache.security")) {
logger.info("No need to spawn a VM, the zerocache is already in place");
System.setProperty("artemis.config.location", location);
USING_SPAWN = false;
main(args);
} else {
securityProperties = "-Djava.security.properties=" + location + "/etc/zerocache.security";
String hostProperties = "-Djdk.net.hosts.file=" + location + "/etc/hosts.conf";
String configLocation = "-Dartemis.config.location=" + location;
String temporaryLocation = "-Djava.io.tmpdir=" + System.getProperty("java.io.tmpdir");
logger.info("if you would like to run without Spawn for debugging purposes, add these properties to your VM arguments on this test: " + securityProperties + " " + hostProperties);
Process p = SpawnedVMSupport.spawnVM(DNSSwitchTest.class.getName(), new String[]{securityProperties, hostProperties, configLocation, temporaryLocation}, args);
addProcess(p);
Assert.assertEquals(1, p.waitFor());
}
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.retention;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ReplayTest extends SmokeTestBase {
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_SERVER_PORT_0 = 1099;
static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "replay", true);
public static final String SERVER_NAME_0 = "replay/replay";
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
startServer(SERVER_NAME_0, 0, 30000);
disableCheckThread();
}
@Test
public void testReplayAMQP() throws Throwable {
testReplay("AMQP", 300, 100);
}
@Test
public void testReplayAMQPLarge() throws Throwable {
testReplay("AMQP", 3, 200 * 1024);
}
@Test
public void testReplayCore() throws Throwable {
testReplay("CORE", 300, 100);
}
@Test
public void testReplayCoreLarge() throws Throwable {
testReplay("CORE", 3, 200 * 1024);
}
private void testReplay(String protocol, int NUMBER_OF_MESSAGES, int bodySize) throws Throwable {
final String queueName = "RetentionTest";
ActiveMQServerControl serverControl = getServerControl(liveURI, liveNameBuilder, 5000);
String bufferStr;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < bodySize; i++) {
buffer.append("*");
}
bufferStr = RandomUtil.randomString() + buffer.toString();
}
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(null);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage(bufferStr);
message.setIntProperty("i", i);
producer.send(queue, message);
}
session.commit();
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(bufferStr, message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
session.commit();
serverControl.replay(queueName, queueName, null);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(bufferStr, message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
session.commit();
serverControl.replay(queueName, queueName, "i=1");
for (int i = 0; i < 2; i++) { // replay of a replay will give you 2 messages
TextMessage message = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(1, message.getIntProperty("i"));
Assert.assertEquals(bufferStr, message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
session.commit();
serverControl.replay(queueName, queueName, "foo='foo'");
Assert.assertNull(consumer.receiveNoWait());
}
}
}