diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 29853d6f36..ea1b0abcdb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -1946,5 +1946,17 @@ public interface ActiveMQServerControl { @Operation(desc = "forces the broker to reload its configuration file", impact = MBeanOperationInfo.ACTION) void reloadConfigurationFile() throws Exception; + + @Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION) + void replay(@Parameter(name = "address", desc = "Name of the address to replay") String address, + @Parameter(name = "target", desc = "Where the replay data should be sent") String target, + @Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception; + + @Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION) + void replay(@Parameter(name = "startScanDate", desc = "Start date where we will start scanning for journals to replay. Format YYYYMMDDHHMMSS") String startScan, + @Parameter(name = "endScanDate", desc = "Finish date where we will stop scannning for journals to replay. Format YYYYMMDDHHMMSS") String endScan, + @Parameter(name = "address", desc = "Name of the address to replay") String address, + @Parameter(name = "target", desc = "Where the replay data should be sent") String target, + @Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index e10dc85573..5b5df258d0 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal; import java.io.File; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalFile; @@ -324,6 +325,10 @@ public interface Journal extends ActiveMQComponent { */ void forceMoveNextFile() throws Exception; + default void forceBackup(int timeout, TimeUnit unit) throws Exception { + } + + /** * Returns the {@link JournalFile}s in use. * diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 651f4573df..0136d262ae 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2891,6 +2891,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override + public void forceBackup(int timeout, TimeUnit unit) throws Exception { + journalLock.writeLock().lock(); + try { + moveNextFile(true, true); + } finally { + journalLock.writeLock().unlock(); + } + + CountDownLatch latch = new CountDownLatch(1); + compactorExecutor.execute(latch::countDown); + latch.await(timeout, unit); + } + + // ActiveMQComponent implementation // --------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalReaderCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalReaderCallback.java index eabdae9080..c52c9ee3f3 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalReaderCallback.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalReaderCallback.java @@ -26,57 +26,69 @@ public interface JournalReaderCallback { default void done() { } - void onReadAddRecord(RecordInfo info) throws Exception; + default void onReadAddRecord(RecordInfo info) throws Exception { + } /** * @param recordInfo * @throws Exception */ - void onReadUpdateRecord(RecordInfo recordInfo) throws Exception; + default void onReadUpdateRecord(RecordInfo recordInfo) throws Exception { + } /** * @param recordID */ - void onReadDeleteRecord(long recordID) throws Exception; + default void onReadDeleteRecord(long recordID) throws Exception { + } + /** * @param transactionID * @param recordInfo * @throws Exception */ - void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception; + default void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception { + } /** * @param transactionID * @param recordInfo * @throws Exception */ - void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception; + default void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception { + + } /** * @param transactionID * @param recordInfo */ - void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception; + default void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception { + } /** * @param transactionID * @param extraData * @param numberOfRecords */ - void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception; + default void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception { + } /** * @param transactionID * @param numberOfRecords */ - void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception; + default void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception { + } /** * @param transactionID */ - void onReadRollbackRecord(long transactionID) throws Exception; + default void onReadRollbackRecord(long transactionID) throws Exception { + } - void markAsDataFile(JournalFile file); + default void markAsDataFile(JournalFile file) { + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 3e1a994573..eb59d5343f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -32,6 +32,7 @@ import javax.management.NotificationListener; import javax.transaction.xa.Xid; import java.net.URL; import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -115,6 +116,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation; +import org.apache.activemq.artemis.core.server.replay.ReplayManager; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; @@ -4438,5 +4440,21 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active public void reloadConfigurationFile() throws Exception { server.reloadConfigurationFile(); } + + @Override + public void replay(String address, String target, String filter) throws Exception { + server.replay(null, null, address, target, filter); + } + + @Override + public void replay(String startScan, String endScan, String address, String target, String filter) throws Exception { + + SimpleDateFormat format = ReplayManager.newRetentionSimpleDateFormat(); + + Date startScanDate = format.parse(startScan); + Date endScanDate = format.parse(endScan); + + server.replay(startScanDate, endScanDate, address, target, filter); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 509d5dfae7..f4e4a2aa20 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -75,6 +75,7 @@ import org.jboss.logging.Logger; public class JournalStorageManager extends AbstractJournalStorageManager { private static final Logger logger = Logger.getLogger(JournalStorageManager.class); + public static final String ACTIVEMQ_DATA = "activemq-data"; protected SequentialFileFactory journalFF; @@ -217,7 +218,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { protected Journal createMessageJournal(Configuration config, IOCriticalErrorListener criticalErrorListener, int fileSize) { - return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()); + return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, ACTIVEMQ_DATA, "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()); } // Life Cycle Handlers diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index f1e6170920..c6e45f243d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -515,4 +515,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229238, value = "No target to redirect the connection") ActiveMQRedirectedException cannotRedirect(); + + @Message(id = 229239, value = "There is not retention configured. In order to use the replay method you must specify journal-retention-directory element on the broker.xml") + IllegalArgumentException noRetention(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 27cb0d6d1b..dc5e53311c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server; import javax.management.MBeanServer; import java.util.Collection; +import java.util.Date; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -194,6 +195,8 @@ public interface ActiveMQServer extends ServiceComponent { */ void registerActivationFailureListener(ActivationFailureListener listener); + void replay(Date start, Date end, String address, String target, String filter) throws Exception; + /** * Remove a previously registered failure listener * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 841744697e..a66186dca0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -175,6 +175,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugi import org.apache.activemq.artemis.core.server.balancing.BrokerBalancerManager; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl; +import org.apache.activemq.artemis.core.server.replay.ReplayManager; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; @@ -266,6 +267,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { private volatile ExecutorService ioExecutorPool; + private ReplayManager replayManager; + /** * This is a thread pool for io tasks only. * We can't use the same global executor to avoid starvations. @@ -388,9 +391,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { } }; - // Constructors - // --------------------------------------------------------------------------------- - public ActiveMQServerImpl() { this(null, null, null); } @@ -468,6 +468,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { this.parentServer = parentServer; this.serviceRegistry = serviceRegistry == null ? new ServiceRegistryImpl() : serviceRegistry; + } @Override @@ -480,8 +481,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { return networkHealthCheck; } - // life-cycle methods - // ---------------------------------------------------------------- + + @Override + public void replay(Date start, Date end, String address, String target, String filter) throws Exception { + if (replayManager == null) { + throw ActiveMQMessageBundle.BUNDLE.noRetention(); + } + replayManager.replay(start, end, address, target, filter); + } /** * A Callback for tests @@ -598,6 +605,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { initializeCriticalAnalyzer(); + if (configuration.getJournalRetentionLocation() != null) { + this.replayManager = new ReplayManager(this); + } else { + this.replayManager = null; + } + startDate = new Date(); state = SERVER_STATE.STARTING; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java new file mode 100644 index 0000000000..a13609ec1c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.replay; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.journal.impl.JournalFile; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; +import org.jboss.logging.Logger; + +public class ReplayManager { + + public static SimpleDateFormat newRetentionSimpleDateFormat() { + return new SimpleDateFormat("yyyyMMddHHmmss"); + } + private static final Logger logger = Logger.getLogger(ReplayManager.class); + + private final ActiveMQServer server; + private JournalImpl journal; + private final File retentionFolder; + + private final SimpleDateFormat dateFormat = newRetentionSimpleDateFormat(); + + private final AtomicBoolean running = new AtomicBoolean(false); + + public ReplayManager(ActiveMQServer server) { + this.server = server; + this.retentionFolder = server.getConfiguration().getJournalRetentionLocation(); + } + + public void replay(Date start, Date end, String sourceAddress, final String targetAddress, String filter) throws Exception { + + if (running.compareAndSet(false, true)) { + try { + actualReplay(start, end, sourceAddress, targetAddress, filter); + } catch (Exception e) { + logger.warn(e.getMessage()); + throw e; + } finally { + running.set(false); + } + } else { + throw new RuntimeException("Replay manager is currently busy with another operation"); + } + } + + private void actualReplay(Date start, Date end, String sourceAddress, String targetAddress, String filterStr) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Replay::" + sourceAddress); + } + if (sourceAddress == null) { + throw new NullPointerException("sourceAddress"); + } + + if (journal == null) { + // notice this routing plays single threaded. no need for any sort of synchronization here + journal = (JournalImpl)server.getStorageManager().getMessageJournal(); + } + + Filter filter; + + if (filterStr != null) { + filter = FilterImpl.createFilter(filterStr); + } else { + filter = null; + } + + journal.forceBackup(1, TimeUnit.MINUTES); + + SequentialFileFactory messagesFF = new NIOSequentialFileFactory(retentionFolder, null, 1); + + // Will use only default values. The load function should adapt to anything different + JournalImpl messagesJournal = new JournalImpl(server.getConfiguration().getJournalFileSize(), server.getConfiguration().getJournalMinFiles(), server.getConfiguration().getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1); + + List files = messagesJournal.orderFiles(); + + RoutingContext context = new RoutingContextImpl(null); + + HashMap> largeMessageLocations = new HashMap<>(); + + for (JournalFile file : files) { + if (start != null || end != null) { + long fileEpochTime = journal.getDatePortionMillis(file.getFile().getFileName()); + + if (logger.isDebugEnabled()) { + String datePortion = journal.getDatePortion(file.getFile().getFileName()); + logger.debug("Evaluating replay for file " + file.getFile().getFileName() + ", datePortion=" + datePortion + "\n" + + "\tInterval evaluated: start(" + start + ") --- file(" + new Date(fileEpochTime) + ") --- end(" + end + ")\n" + + "\tepoch times: start(" + start.getTime() + ") --- file(" + fileEpochTime + ") + end(" + end.getTime() + ")"); + } + + if (start != null && fileEpochTime < start.getTime()) { + if (logger.isDebugEnabled()) { + logger.debug("File " + file.getFile().getFileName() + " being skipped on start comparison"); + } + continue; + } + + if (end != null && fileEpochTime > end.getTime()) { + if (logger.isDebugEnabled()) { + logger.debug("File " + file.getFile().getFileName() + " being skipped on end comparison"); + } + continue; + } + } + JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() { + @Override + public void onReadEventRecord(RecordInfo info) throws Exception { + switch (info.getUserRecordType()) { + case JournalRecordIds.ADD_MESSAGE_BODY: + LinkedHashSet files = largeMessageLocations.get(info.id); + if (files == null) { + files = new LinkedHashSet<>(); + largeMessageLocations.put(info.id, files); + } + files.add(file); + break; + + default: + onReadAddRecord(info); + } + } + + @Override + public void onReadAddRecord(RecordInfo info) throws Exception { + if (info.getUserRecordType() == JournalRecordIds.ADD_LARGE_MESSAGE) { + ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(info.data); + LargeServerMessage message = new LargeServerMessageImpl(server.getStorageManager()); + LargeMessagePersister.getInstance().decode(buffer, message, null); + route(filter, context, messagesFF, message.toMessage(), sourceAddress, targetAddress, largeMessageLocations); + } else if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE_PROTOCOL) { + ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(info.data); + Message message = MessagePersister.getInstance().decode(buffer, null, null, server.getStorageManager()); + route(filter, context, messagesFF, message, sourceAddress, targetAddress, largeMessageLocations); + } + + } + + @Override + public void onReadUpdateRecord(RecordInfo info) throws Exception { + onReadAddRecord(info); + } + + @Override + public void onReadAddRecordTX(long transactionID, RecordInfo info) throws Exception { + onReadAddRecord(info); + } + + @Override + public void onReadUpdateRecordTX(long transactionID, RecordInfo info) throws Exception { + onReadUpdateRecord(info); + } + + }, null, false, null); + } + } + + private boolean messageMatch(Filter filter, Message message, String sourceAddress, String targetAddress) { + if (message.getAddress() != null && message.getAddress().equals(sourceAddress)) { + if (filter != null) { + if (!filter.match(message)) { + return false; + } + } + if (targetAddress != null && !targetAddress.equals(sourceAddress)) { + message.setAddress(targetAddress); + message.reencode(); + } + return true; + } else { + return false; + } + } + + + private void route(Filter filter, RoutingContext context, SequentialFileFactory messagesFF, Message message, String sourceAddress, String targetAddress, HashMap> filesMap) throws Exception { + if (messageMatch(filter, message, sourceAddress, targetAddress)) { + final long originalMessageID = message.getMessageID(); + message.setMessageID(server.getStorageManager().generateID()); + if (message.isLargeMessage()) { + readLargeMessageBody(messagesFF, message, filesMap, originalMessageID); + } + if (targetAddress != null && !sourceAddress.equals(targetAddress)) { + message.setAddress(targetAddress); + message.reencode(); + } + server.getPostOffice().route(message, context, false, false, null); + context.clear(); + } else { + if (message.isLargeMessage()) { + filesMap.remove(message.getMessageID()); + } + } + } + + private void readLargeMessageBody(SequentialFileFactory messagesFF, + Message message, + HashMap> filesMap, + long originalMessageID) throws Exception { + long newMessageID = message.getMessageID(); + SequentialFile largeMessageFile = server.getStorageManager().createFileForLargeMessage(newMessageID, true); + largeMessageFile.open(); + + LinkedHashSet files = filesMap.get(originalMessageID); + if (files != null) { + for (JournalFile file : files) { + JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() { + @Override + public void onReadEventRecord(RecordInfo info) throws Exception { + if (info.userRecordType == JournalRecordIds.ADD_MESSAGE_BODY && info.id == originalMessageID) { + server.getStorageManager().addBytesToLargeMessage(largeMessageFile, newMessageID, info.data); + } + } + }); + } + } + largeMessageFile.close(); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 24c753108b..874637baba 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -19,16 +19,20 @@ package org.apache.activemq.artemis.tests.integration.management; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; import javax.json.JsonArray; import javax.json.JsonObject; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -87,6 +91,7 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RetryMethod; @@ -4079,6 +4084,108 @@ public class ActiveMQServerControlTest extends ManagementTestBase { } } + @Test + public void testReplayWithoutDate() throws Exception { + testReplaySimple(false); + } + + @Test + public void testReplayWithDate() throws Exception { + testReplaySimple(true); + } + + private void testReplaySimple(boolean useDate) throws Exception { + ActiveMQServerControl serverControl = createManagementControl(); + String queue = "testQueue" + RandomUtil.randomString(); + server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue)); + + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue jmsQueue = session.createQueue(queue); + MessageProducer producer = session.createProducer(jmsQueue); + producer.send(session.createTextMessage("before")); + + connection.start(); + MessageConsumer consumer = session.createConsumer(jmsQueue); + Assert.assertNotNull(consumer.receive(5000)); + Assert.assertNull(consumer.receiveNoWait()); + + serverControl.replay(queue, queue, null); + Assert.assertNotNull(consumer.receive(5000)); + Assert.assertNull(consumer.receiveNoWait()); + + if (useDate) { + serverControl.replay("dontexist", "dontexist", null); // just to force a move next file, and copy stuff into place + SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); + Thread.sleep(1000); // waiting a second just to have the timestamp change + String dateEnd = format.format(new Date()); + Thread.sleep(1000); // waiting a second just to have the timestamp change + String dateStart = "19800101000000"; + + + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage("after receiving")); + } + for (int i = 0; i < 100; i++) { + Assert.assertNotNull(consumer.receive()); + } + Assert.assertNull(consumer.receiveNoWait()); + serverControl.replay(dateStart, dateEnd, queue, queue, null); + for (int i = 0; i < 2; i++) { // replay of the replay will contain two messages + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("before", message.getText()); + } + Assert.assertNull(consumer.receiveNoWait()); + } else { + serverControl.replay(queue, queue, null); + + // replay of the replay, there will be two messages + for (int i = 0; i < 2; i++) { + Assert.assertNotNull(consumer.receive(5000)); + } + Assert.assertNull(consumer.receiveNoWait()); + } + } + } + + + @Test + public void testReplayFilter() throws Exception { + ActiveMQServerControl serverControl = createManagementControl(); + String queue = "testQueue" + RandomUtil.randomString(); + server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue)); + + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue jmsQueue = session.createQueue(queue); + MessageProducer producer = session.createProducer(jmsQueue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage("message " + i); + message.setIntProperty("i", i); + producer.send(message); + } + + connection.start(); + MessageConsumer consumer = session.createConsumer(jmsQueue); + for (int i = 0; i < 10; i++) { + Assert.assertNotNull(consumer.receive(5000)); + } + Assert.assertNull(consumer.receiveNoWait()); + + serverControl.replay(queue, queue, "i=5"); + TextMessage message = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(5, message.getIntProperty("i")); + Assert.assertEquals("message 5", message.getText()); + Assert.assertNull(consumer.receiveNoWait()); + } + } + @Test public void testBrokerConnections() throws Exception { @@ -4130,7 +4237,6 @@ public class ActiveMQServerControlTest extends ManagementTestBase { } } - protected void scaleDown(ScaleDownHandler handler) throws Exception { SimpleString address = new SimpleString("testQueue"); HashMap params = new HashMap<>(); @@ -4200,6 +4306,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { securityConfiguration.addRole("myUser", "guest"); securityConfiguration.setDefaultUser("guest"); ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), securityConfiguration); + conf.setJournalRetentionDirectory(conf.getJournalDirectory() + "_ret"); // needed for replay tests server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, securityManager, true)); server.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index ab060fac52..9c42638ec3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -1645,6 +1645,20 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes @Parameter(name = "Page Size") int pageSize) throws Exception { return (String) proxy.invokeOperation("listQueues", options, page, pageSize); } + + @Override + public void replay(String address, String target, String filter) throws Exception { + proxy.invokeOperation("replay", address, target, filter); + } + + @Override + public void replay(String startScan, + String endScan, + String address, + String target, + String filter) throws Exception { + proxy.invokeOperation("replay", startScan, endScan, address, target, filter); + } }; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/retention/ReplayTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/retention/ReplayTest.java new file mode 100644 index 0000000000..5619c8a6ab --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/retention/ReplayTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.retention; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.junit.Assert; +import org.junit.Test; + +public class ReplayTest extends ActiveMQTestBase { + + ActiveMQServer server; + + @Override + public void setUp() throws Exception { + super.setUp(); + + server = addServer(createServer(true, true)); + server.getConfiguration().setJournalRetentionDirectory(getJournalDir() + "retention"); + server.getConfiguration().setJournalFileSize(100 * 1024); + + server.start(); + + server.addAddressInfo(new AddressInfo("t1").addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration("t1").setAddress("t1").setRoutingType(RoutingType.ANYCAST)); + + server.addAddressInfo(new AddressInfo("t2").addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration("t2").setAddress("t2").setRoutingType(RoutingType.ANYCAST)); + } + + @Test + public void testReplayAMQP() throws Exception { + testReplay("AMQP", 10); + } + + @Test + public void testReplayCore() throws Exception { + testReplay("CORE", 10); + } + + public void testReplay(String protocol, int size) throws Exception { + + StringBuffer buffer = new StringBuffer(); + buffer.append(RandomUtil.randomString()); + for (int i = 0; i < size; i++) { + buffer.append("*"); + } + + ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue("t1"); + + MessageProducer producer = session.createProducer(null); + + producer.send(queue, session.createTextMessage(buffer.toString())); + + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + + Assert.assertNotNull(consumer.receive(5000)); + + Assert.assertNull(consumer.receiveNoWait()); + + server.replay(null, null, "t1", "t2", null); + + Queue t2 = session.createQueue("t2"); + + MessageConsumer consumert2 = session.createConsumer(t2); + + TextMessage receivedMessage = (TextMessage) consumert2.receive(5000); + + Assert.assertNotNull(receivedMessage); + + Assert.assertEquals(buffer.toString(), receivedMessage.getText()); + + Assert.assertNull(consumert2.receiveNoWait()); + + server.replay(null, null, "t2", "t1", null); + + receivedMessage = (TextMessage) consumer.receive(5000); + + Assert.assertNotNull(receivedMessage); + + Assert.assertNull(consumer.receiveNoWait()); + + // invalid filter.. nothing should be re played + server.replay(null, null, "t1", "t1", "foo='foo'"); + + Assert.assertNull(consumer.receiveNoWait()); + } + + } + + @Test + public void testReplayLargeAMQP() throws Exception { + testReplay("AMQP", 500 * 1024); + } + + @Test + public void testReplayLargeCore() throws Exception { + testReplay("CORE", 500 * 1024); + } + +} diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 95a2d212f1..790a4237fd 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -1159,6 +1159,28 @@ ${basedir}/target/classes/servers/bridgeTransfer/serverB + + + test-compile + create-replay + + create + + + amq + artemis + artemis + true + false + ${basedir}/target/replay/replay + ${basedir}/target/classes/servers/replay/replay + + + --java-options + -Djava.rmi.server.hostname=localhost + + + diff --git a/tests/smoke-tests/src/main/resources/servers/replay/replay/broker.xml b/tests/smoke-tests/src/main/resources/servers/replay/replay/broker.xml new file mode 100644 index 0000000000..25601c67ce --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replay/replay/broker.xml @@ -0,0 +1,245 @@ + + + + + + + + replay + + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + ./data/retention + + + + true + + 2 + + 10 + + 4096 + + 10M + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + true + + 120000 + + 60000 + + HALT + + + + + + + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true + + + tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true + + + tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + false + false + + + + +

+ + + +
+
+ + + +
+ +
+ + + +
+ + + + + + + + diff --git a/tests/smoke-tests/src/main/resources/servers/replay/replay/management.xml b/tests/smoke-tests/src/main/resources/servers/replay/replay/management.xml new file mode 100644 index 0000000000..1677deba0a --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replay/replay/management.xml @@ -0,0 +1,52 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java index 67eccef1f2..0090f98d59 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.smoke.common; +import javax.management.MBeanServerInvocationHandler; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; @@ -26,6 +27,8 @@ import java.net.MalformedURLException; import java.util.HashSet; import java.util.Set; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.cli.commands.Stop; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; @@ -99,6 +102,32 @@ public class SmokeTestBase extends ActiveMQTestBase { return getJmxConnector(JMX_SERVER_HOSTNAME, JMX_SERVER_PORT); } + protected static JMXConnector newJMXFactory(String uri) throws Throwable { + return JMXConnectorFactory.connect(new JMXServiceURL(uri)); + } + + protected static ActiveMQServerControl getServerControl(String uri, + ObjectNameBuilder builder, + long timeout) throws Throwable { + long expireLoop = System.currentTimeMillis() + timeout; + Throwable lastException = null; + do { + try { + JMXConnector connector = newJMXFactory(uri); + + ActiveMQServerControl serverControl = MBeanServerInvocationHandler.newProxyInstance(connector.getMBeanServerConnection(), builder.getActiveMQServerObjectName(), ActiveMQServerControl.class, false); + serverControl.isActive(); // making one call to make sure it's working + return serverControl; + } catch (Throwable e) { + lastException = e; + Thread.sleep(500); + } + } + while (expireLoop > System.currentTimeMillis()); + + throw lastException; + } + protected static JMXConnector getJmxConnector(String hostname, int port) throws MalformedURLException { // Without this, the RMI server would bind to the default interface IP (the user's local IP mostly) System.setProperty("java.rmi.server.hostname", hostname); diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/dnsswitch/DNSSwitchTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/dnsswitch/DNSSwitchTest.java index b29b0c65cc..9eb2624a68 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/dnsswitch/DNSSwitchTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/dnsswitch/DNSSwitchTest.java @@ -23,10 +23,6 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import javax.management.MBeanServerInvocationHandler; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -70,77 +66,40 @@ import org.junit.Test; */ public class DNSSwitchTest extends SmokeTestBase { - private static boolean USING_SPAWN = true; public static final File ETC_HOSTS = new File("/etc/hosts"); - - private static File ETC_BACKUP; - private static final String JMX_SERVER_HOSTNAME = "localhost"; private static final int JMX_SERVER_PORT_0 = 10099; private static final int JMX_SERVER_PORT_1 = 10199; - - static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi"; - static String backupURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_1 + "/jmxrmi"; - - static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "live", true); - static ObjectNameBuilder backupNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "backup", true); - - // This is a more intrusive option to use with JDK 8 - // Instead of using a separate jdk hsots, which is not supported on jdk8, - // with this option set to true we would use the original /etc/hosts - private static boolean USE_ETC_HOSTS = System.getProperty("java.version").startsWith("1.8"); - private static final Logger logger = Logger.getLogger(DNSSwitchTest.class); - private static final String SERVER_NAME_0 = "dnsswitch"; private static final String SERVER_NAME_1 = "dnsswitch2"; private static final String SERVER_STANDARD = "standard"; private static final String SERVER_LIVE = "dnsswitch-replicated-main"; private static final String SERVER_LIVE_NORETRYDNS = "dnsswitch-replicated-main-noretrydns"; private static final String SERVER_BACKUP = "dnsswitch-replicated-backup"; - private static final String SERVER_LIVE_PING = "dnsswitch-replicated-main-withping"; private static final String SERVER_BACKUP_PING = "dnsswitch-replicated-backup-withping"; - // 192.0.2.0 is reserved for documentation (and testing on this case). private static final String FIRST_IP = "192.0.2.0"; private static final String SECOND_IP = "192.0.3.0"; private static final String THIRD_IP = "192.0.3.0"; private static final String FOURTH_IP = "192.0.4.0"; - private static final String INVALID_IP = "203.0.113.0"; - + private static final String hostsFile = System.getProperty("jdk.net.hosts.file"); + static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi"; + static String backupURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_1 + "/jmxrmi"; + static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "live", true); + static ObjectNameBuilder backupNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "backup", true); + private static boolean USING_SPAWN = true; + private static File ETC_BACKUP; + // This is a more intrusive option to use with JDK 8 + // Instead of using a separate jdk hsots, which is not supported on jdk8, + // with this option set to true we would use the original /etc/hosts + private static boolean USE_ETC_HOSTS = System.getProperty("java.version").startsWith("1.8"); private static String serverLocation; - @Rule public NetUtilResource netUtilResource = new NetUtilResource(); - private static JMXConnector newJMXFactory(String uri) throws Throwable { - return JMXConnectorFactory.connect(new JMXServiceURL(uri)); - } - - private static ActiveMQServerControl getServerControl(String uri, - ObjectNameBuilder builder, - long timeout) throws Throwable { - long expireLoop = System.currentTimeMillis() + timeout; - Throwable lastException = null; - do { - try { - JMXConnector connector = newJMXFactory(uri); - - ActiveMQServerControl serverControl = MBeanServerInvocationHandler.newProxyInstance(connector.getMBeanServerConnection(), builder.getActiveMQServerObjectName(), ActiveMQServerControl.class, false); - serverControl.isActive(); // making one call to make sure it's working - return serverControl; - } catch (Throwable e) { - lastException = e; - Thread.sleep(500); - } - } - while (expireLoop > System.currentTimeMillis()); - - throw lastException; - } - @BeforeClass public static void beforeClassMethod() throws Exception { NetUtil.skipIfNotSupportedOS(); @@ -233,7 +192,6 @@ public class DNSSwitchTest extends SmokeTestBase { Assert.assertTrue("You must send pairs as overrideParameters", overrideParameters.length % 2 == 0); - String javaVersion = System.getProperty("java.version"); File security; @@ -254,25 +212,6 @@ public class DNSSwitchTest extends SmokeTestBase { securityProperties.store(new FileOutputStream(outputSecurity), "# generated by DNSSwitchTest"); } - private static final String hostsFile = System.getProperty("jdk.net.hosts.file"); - - @Before - public void before() throws Exception { - cleanupData(SERVER_NAME_0); - cleanupData(SERVER_NAME_1); - cleanupData(SERVER_STANDARD); - cleanupData(SERVER_LIVE); - cleanupData(SERVER_LIVE_NORETRYDNS); - cleanupData(SERVER_BACKUP); - cleanupData(SERVER_LIVE_PING); - cleanupData(SERVER_BACKUP_PING); - } - - @Test - public void testBackupRedefinition() throws Throwable { - spawnRun(serverLocation, "testBackupRedefinition", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP)); - } - public static void testBackupRedefinition(String[] args) throws Throwable { NetUtil.netUp(FIRST_IP, "lo:first"); NetUtil.netUp(SECOND_IP, "lo:second"); @@ -359,12 +298,6 @@ public class DNSSwitchTest extends SmokeTestBase { } - - @Test - public void testBackupRedefinition2() throws Throwable { - spawnRun(serverLocation, "testBackupRedefinition2", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP)); - } - public static void testBackupRedefinition2(String[] args) throws Throwable { NetUtil.netUp(FIRST_IP, "lo:first"); NetUtil.netUp(SECOND_IP, "lo:second"); @@ -396,7 +329,6 @@ public class DNSSwitchTest extends SmokeTestBase { Wait.assertTrue(backupControl::isStarted); Wait.assertTrue(backupControl::isReplicaSync); - saveConf(hostsFile, FIRST_IP, "FIRST", SECOND_IP, "SECOND"); serverBackup.destroyForcibly(); serverBackup = ServerUtil.startServer(args[2], "backup", "tcp://SECOND:61716", 0); @@ -459,12 +391,6 @@ public class DNSSwitchTest extends SmokeTestBase { } - - @Test - public void testBackupRedefinition3() throws Throwable { - spawnRun(serverLocation, "testBackupRedefinition2", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP)); - } - public static void testBackupRedefinition3(String[] args) throws Throwable { NetUtil.netUp(FIRST_IP, "lo:first"); NetUtil.netUp(SECOND_IP, "lo:second"); @@ -496,7 +422,6 @@ public class DNSSwitchTest extends SmokeTestBase { Wait.assertTrue(backupControl::isStarted); Wait.assertTrue(backupControl::isReplicaSync); - saveConf(hostsFile, FIRST_IP, "FIRST", SECOND_IP, "SECOND"); serverBackup.destroyForcibly(); serverBackup = ServerUtil.startServer(args[2], "backup", "tcp://SECOND:61716", 0); @@ -556,17 +481,10 @@ public class DNSSwitchTest extends SmokeTestBase { serverLive.destroyForcibly(); } - } } - - @Test - public void testCantReachBack() throws Throwable { - spawnRun(serverLocation, "testCantReachBack", getServerLocation(SERVER_LIVE_NORETRYDNS), getServerLocation(SERVER_BACKUP)); - } - public static void testCantReachBack(String[] args) throws Throwable { NetUtil.netUp(FIRST_IP, "lo:first"); NetUtil.netUp(SECOND_IP, "lo:second"); @@ -604,12 +522,6 @@ public class DNSSwitchTest extends SmokeTestBase { } - - @Test - public void testWithPing() throws Throwable { - spawnRun(serverLocation, "testWithPing", getServerLocation(SERVER_LIVE_PING), getServerLocation(SERVER_BACKUP_PING)); - } - public static void testWithPing(String[] args) throws Throwable { NetUtil.netUp(FIRST_IP, "lo:first"); NetUtil.netUp(SECOND_IP, "lo:second"); @@ -640,14 +552,12 @@ public class DNSSwitchTest extends SmokeTestBase { serverBackup.destroyForcibly(); - //Thread.sleep(10_000); serverLive.destroyForcibly(); serverLive = ServerUtil.startServer(args[1], "live", "tcp://FIRST:61616", 0); Thread.sleep(1_000); - logger.debug("going to re-enable ping"); // Enable the address just for ping now saveConf(hostsFile, THIRD_IP, "PINGPLACE"); @@ -683,25 +593,16 @@ public class DNSSwitchTest extends SmokeTestBase { serverLive.destroyForcibly(); } - } } - @Test - public void testWithoutPingKill() throws Throwable { - spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "1"); - } - - @Test - public void testWithoutPingRestart() throws Throwable { - spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "0"); - } /** * arg[0] = constant "testWithoutPing" to be used on reflection through main(String arg[]) * arg[1] = serverlive * arg[2] = server backup * arg[3] = 1 | 0 (kill the backup = 1, stop the backup = 0); + * * @param args * @throws Throwable */ @@ -778,12 +679,10 @@ public class DNSSwitchTest extends SmokeTestBase { serverLive.destroyForcibly(); } - } } - private static void connectAndWaitBackup() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://FIRST:61616?ha=true"); Assert.assertTrue(connectionFactory.getServerLocator().isHA()); @@ -792,12 +691,6 @@ public class DNSSwitchTest extends SmokeTestBase { connection.close(); } - @Test - public void testFailoverDifferentIPRedefinition() throws Throwable { - - spawnRun(serverLocation, "testFailoverDifferentIPRedefinition", serverLocation, getServerLocation(SERVER_NAME_1)); - } - public static void testFailoverDifferentIPRedefinition(String[] arg) throws Throwable { NetUtil.netUp(FIRST_IP); NetUtil.netUp(SECOND_IP); @@ -850,20 +743,6 @@ public class DNSSwitchTest extends SmokeTestBase { } - @Test - public void testInitialConnector() throws Throwable { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://test:61616?initialConnectAttempts=500&retryInterval=100&connect-timeout-millis=100"); - startServer(SERVER_STANDARD, 0, 30000); - - String location = getServerLocation(SERVER_NAME_0); - - spawnRun(location, "testInitialConnector"); - // If you eed to debug the test, comment out spawnRun, and call the method directly - // you will need to add roperties on the JDK for that - // Add the properties you need - // testInitialConnector("testInitialConnector", location); - } - // called with reflection public static void testInitialConnector(String... arg) throws Throwable { saveConf(hostsFile, "192.0.0.3", "test"); @@ -903,12 +782,6 @@ public class DNSSwitchTest extends SmokeTestBase { Assert.assertFalse(connecting.isAlive()); } - // This test is just validating the DNS is not being cached on the separte VM - @Test - public void testSimpleResolution() throws Throwable { - spawnRun(serverLocation, "testSimpleResolution"); - } - // called with reflection public static void testSimpleResolution(String[] arg) throws Throwable { // This is just to validate the DNS hosts is picking up the right host @@ -920,11 +793,6 @@ public class DNSSwitchTest extends SmokeTestBase { validateIP("test", "192.0.0.3"); } - @Test - public void testSplitBrainDetection() throws Throwable { - spawnRun(serverLocation, "testSplitBrainDetection", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP)); - } - /** * arg[0] = constant "testSplitBrainDetection" to be used on reflection through main(String arg[]) * arg[1] = serverlive @@ -975,8 +843,7 @@ public class DNSSwitchTest extends SmokeTestBase { Wait.assertTrue(() -> !liveControl.isReplicaSync()); logger.debug("Waiting enough to let live spread its topology around"); - try (ActiveMQConnectionFactory firstCf = new ActiveMQConnectionFactory("tcp://FIRST:61616?ha=false"); - Connection ignored = firstCf.createConnection()) { + try (ActiveMQConnectionFactory firstCf = new ActiveMQConnectionFactory("tcp://FIRST:61616?ha=false"); Connection ignored = firstCf.createConnection()) { waitForTopology(firstCf.getServerLocator().getTopology(), 60_000, 1, 1); final Topology topology = firstCf.getServerLocator().getTopology(); final TopologyMemberImpl member = topology.getMember(liveControl.getNodeID()); @@ -993,8 +860,7 @@ public class DNSSwitchTest extends SmokeTestBase { Assert.assertEquals("SECOND", backup.getParams().get("host")); Assert.assertEquals("61716", backup.getParams().get("port")); } - try (ActiveMQConnectionFactory secondCf = new ActiveMQConnectionFactory("tcp://SECOND:61716?ha=false"); - Connection ignored = secondCf.createConnection()) { + try (ActiveMQConnectionFactory secondCf = new ActiveMQConnectionFactory("tcp://SECOND:61716?ha=false"); Connection ignored = secondCf.createConnection()) { logger.debug("Waiting until second broker topology has just a single live node"); waitForTopology(secondCf.getServerLocator().getTopology(), 60_000, 1, 0); final Topology topology = secondCf.getServerLocator().getTopology(); @@ -1018,33 +884,6 @@ public class DNSSwitchTest extends SmokeTestBase { } } - /** - * it will continue the test on a spwned VM with the properties we need for this test - */ - private void spawnRun(String location, String... args) throws Throwable { - // We have to run part of the test on a separate VM, as we need VM settings to tweak the DNS - - String securityProperties = System.getProperty("java.security.properties"); - - if (securityProperties != null && securityProperties.equals(location + "/etc/zerocache.security")) { - logger.info("No need to spawn a VM, the zerocache is already in place"); - System.setProperty("artemis.config.location", location); - USING_SPAWN = false; - main(args); - } else { - - securityProperties = "-Djava.security.properties=" + location + "/etc/zerocache.security"; - String hostProperties = "-Djdk.net.hosts.file=" + location + "/etc/hosts.conf"; - String configLocation = "-Dartemis.config.location=" + location; - String temporaryLocation = "-Djava.io.tmpdir=" + System.getProperty("java.io.tmpdir"); - - logger.info("if you would like to run without Spawn for debugging purposes, add these properties to your VM arguments on this test: " + securityProperties + " " + hostProperties); - Process p = SpawnedVMSupport.spawnVM(DNSSwitchTest.class.getName(), new String[]{securityProperties, hostProperties, configLocation, temporaryLocation}, args); - addProcess(p); - Assert.assertEquals(1, p.waitFor()); - } - } - public static void saveConf(String fileName, String... hostDefinition) throws Exception { if (USE_ETC_HOSTS) { recoverETCHosts(); @@ -1108,4 +947,109 @@ public class DNSSwitchTest extends SmokeTestBase { } } + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + cleanupData(SERVER_NAME_1); + cleanupData(SERVER_STANDARD); + cleanupData(SERVER_LIVE); + cleanupData(SERVER_LIVE_NORETRYDNS); + cleanupData(SERVER_BACKUP); + cleanupData(SERVER_LIVE_PING); + cleanupData(SERVER_BACKUP_PING); + } + + @Test + public void testBackupRedefinition() throws Throwable { + spawnRun(serverLocation, "testBackupRedefinition", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP)); + } + + @Test + public void testBackupRedefinition2() throws Throwable { + spawnRun(serverLocation, "testBackupRedefinition2", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP)); + } + + @Test + public void testBackupRedefinition3() throws Throwable { + spawnRun(serverLocation, "testBackupRedefinition2", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP)); + } + + @Test + public void testCantReachBack() throws Throwable { + spawnRun(serverLocation, "testCantReachBack", getServerLocation(SERVER_LIVE_NORETRYDNS), getServerLocation(SERVER_BACKUP)); + } + + @Test + public void testWithPing() throws Throwable { + spawnRun(serverLocation, "testWithPing", getServerLocation(SERVER_LIVE_PING), getServerLocation(SERVER_BACKUP_PING)); + } + + @Test + public void testWithoutPingKill() throws Throwable { + spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "1"); + } + + @Test + public void testWithoutPingRestart() throws Throwable { + spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "0"); + } + + @Test + public void testFailoverDifferentIPRedefinition() throws Throwable { + + spawnRun(serverLocation, "testFailoverDifferentIPRedefinition", serverLocation, getServerLocation(SERVER_NAME_1)); + } + + @Test + public void testInitialConnector() throws Throwable { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://test:61616?initialConnectAttempts=500&retryInterval=100&connect-timeout-millis=100"); + startServer(SERVER_STANDARD, 0, 30000); + + String location = getServerLocation(SERVER_NAME_0); + + spawnRun(location, "testInitialConnector"); + // If you eed to debug the test, comment out spawnRun, and call the method directly + // you will need to add roperties on the JDK for that + // Add the properties you need + // testInitialConnector("testInitialConnector", location); + } + + // This test is just validating the DNS is not being cached on the separte VM + @Test + public void testSimpleResolution() throws Throwable { + spawnRun(serverLocation, "testSimpleResolution"); + } + + @Test + public void testSplitBrainDetection() throws Throwable { + spawnRun(serverLocation, "testSplitBrainDetection", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP)); + } + + /** + * it will continue the test on a spwned VM with the properties we need for this test + */ + private void spawnRun(String location, String... args) throws Throwable { + // We have to run part of the test on a separate VM, as we need VM settings to tweak the DNS + + String securityProperties = System.getProperty("java.security.properties"); + + if (securityProperties != null && securityProperties.equals(location + "/etc/zerocache.security")) { + logger.info("No need to spawn a VM, the zerocache is already in place"); + System.setProperty("artemis.config.location", location); + USING_SPAWN = false; + main(args); + } else { + + securityProperties = "-Djava.security.properties=" + location + "/etc/zerocache.security"; + String hostProperties = "-Djdk.net.hosts.file=" + location + "/etc/hosts.conf"; + String configLocation = "-Dartemis.config.location=" + location; + String temporaryLocation = "-Djava.io.tmpdir=" + System.getProperty("java.io.tmpdir"); + + logger.info("if you would like to run without Spawn for debugging purposes, add these properties to your VM arguments on this test: " + securityProperties + " " + hostProperties); + Process p = SpawnedVMSupport.spawnVM(DNSSwitchTest.class.getName(), new String[]{securityProperties, hostProperties, configLocation, temporaryLocation}, args); + addProcess(p); + Assert.assertEquals(1, p.waitFor()); + } + } + } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/retention/ReplayTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/retention/ReplayTest.java new file mode 100644 index 0000000000..cde5b4d6d1 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/retention/ReplayTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.retention; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ReplayTest extends SmokeTestBase { + private static final String JMX_SERVER_HOSTNAME = "localhost"; + private static final int JMX_SERVER_PORT_0 = 1099; + static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi"; + static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "replay", true); + + public static final String SERVER_NAME_0 = "replay/replay"; + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + startServer(SERVER_NAME_0, 0, 30000); + disableCheckThread(); + } + + @Test + public void testReplayAMQP() throws Throwable { + testReplay("AMQP", 300, 100); + } + + @Test + public void testReplayAMQPLarge() throws Throwable { + testReplay("AMQP", 3, 200 * 1024); + } + + @Test + public void testReplayCore() throws Throwable { + testReplay("CORE", 300, 100); + } + + @Test + public void testReplayCoreLarge() throws Throwable { + testReplay("CORE", 3, 200 * 1024); + } + + private void testReplay(String protocol, int NUMBER_OF_MESSAGES, int bodySize) throws Throwable { + + final String queueName = "RetentionTest"; + + ActiveMQServerControl serverControl = getServerControl(liveURI, liveNameBuilder, 5000); + + String bufferStr; + { + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < bodySize; i++) { + buffer.append("*"); + } + bufferStr = RandomUtil.randomString() + buffer.toString(); + } + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + Queue queue = session.createQueue(queueName); + + MessageProducer producer = session.createProducer(null); + + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + Message message = session.createTextMessage(bufferStr); + message.setIntProperty("i", i); + producer.send(queue, message); + } + session.commit(); + + connection.start(); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(bufferStr, message.getText()); + } + Assert.assertNull(consumer.receiveNoWait()); + session.commit(); + + serverControl.replay(queueName, queueName, null); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(bufferStr, message.getText()); + } + Assert.assertNull(consumer.receiveNoWait()); + session.commit(); + + serverControl.replay(queueName, queueName, "i=1"); + + for (int i = 0; i < 2; i++) { // replay of a replay will give you 2 messages + TextMessage message = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(1, message.getIntProperty("i")); + Assert.assertEquals(bufferStr, message.getText()); + } + + Assert.assertNull(consumer.receiveNoWait()); + session.commit(); + + serverControl.replay(queueName, queueName, "foo='foo'"); + Assert.assertNull(consumer.receiveNoWait()); + } + } + +}