Moving artemis-tools to artemis-cli and improving the tooling

Artemis tools is now part of the cli
Bootstrap was renamed to CLI
This commit is contained in:
Clebert Suconic 2015-05-03 11:40:36 -04:00
parent dc58e431a5
commit ea3370b38c
76 changed files with 669 additions and 1797 deletions

View File

@ -23,7 +23,7 @@
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis Bootstrap</name>

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.cli;
import java.io.InputStream;
import java.io.OutputStream;
import io.airlift.command.Cli;
import io.airlift.command.ParseArgumentsUnexpectedException;
import org.apache.activemq.artemis.cli.commands.Action;
@ -24,9 +27,12 @@ import org.apache.activemq.artemis.cli.commands.Create;
import org.apache.activemq.artemis.cli.commands.HelpAction;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.Stop;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal;
import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal;
import org.apache.activemq.artemis.cli.commands.tools.HelpData;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
public class Artemis
{
@ -39,11 +45,15 @@ public class Artemis
.withCommand(HelpAction.class)
.withDefaultCommand(HelpAction.class);
builder.withGroup("data").withDescription("data tools group (print|exp|imp|exp|encode|decode) (example ./artemis data print)").
withDefaultCommand(HelpData.class).withCommands(PrintData.class, XmlDataExporter.class, XmlDataImporter.class,
DecodeJournal.class, EncodeJournal.class);
if (instance != null)
{
builder = builder
.withCommand(Run.class)
.withCommand(Stop.class);
.withCommands(Run.class, Stop.class);
}
else
{

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
package org.apache.activemq.artemis.cli.commands.tools;
import java.io.BufferedReader;
import java.io.File;
@ -27,70 +27,52 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.airlift.command.Option;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalRecord;
import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
import org.apache.activemq.artemis.utils.Base64;
/**
* Use this class to import the journal data from a listed file. You can use it as a main class or
* through its native method
* {@link #importJournal(String, String, String, int, int, String)}
* <p>
* If you use the main method, use its arguments as:
*
* <pre>
* JournalDirectory JournalPrefix FileExtension MinFiles FileSize FileOutput
* </pre>
* <p>
* Example:
*
* <pre>
* java -cp activemq-core.jar org.apache.activemq.artemis.core.journal.impl.ExportJournal /journalDir activemq-data amq 2 10485760 /tmp/export.dat
* </pre>
*/
public class ImportJournal
@Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files")
public class DecodeJournal implements Action
{
// Constants -----------------------------------------------------
@Option(name = "--directory", description = "The journal folder (default ../data/journal)")
public String directory = "../data/journal";
// Attributes ----------------------------------------------------
@Option(name = "--prefix", description = "The journal prefix (default activemq-datal)")
public String prefix = "activemq-data";
// Static --------------------------------------------------------
@Option(name = "--suffix", description = "The journal suffix (default amq)")
public String suffix = "amq";
// Constructors --------------------------------------------------
@Option(name = "--file-size", description = "The journal size (default 10485760)")
public int size = 10485760;
// Public --------------------------------------------------------
@Arguments(description = "The input file name (default=exp.dmp)", required = true)
public String input;
public void process(final String[] arg)
public Object execute(ActionContext context) throws Exception
{
for (int i = 0; i < arg.length; i++)
{
System.out.println("arg[" + i + "] = " + arg[i]);
}
if (arg.length != 6)
{
for (int i = 0; i < arg.length; i++)
{
System.out.println("arg[" + i + "] = " + arg[i]);
}
printUsage();
System.exit(-1);
}
try
{
importJournal(arg[1], arg[2], arg[3], 2, Integer.parseInt(arg[4]), arg[5]);
importJournal(directory, prefix, suffix, 2, size, input);
}
catch (Exception e)
{
e.printStackTrace();
}
return null;
}
public static void importJournal(final String directory,
final String journalPrefix,
final String journalSuffix,
@ -401,7 +383,6 @@ public class ImportJournal
}
System.err.println("This method will export the journal at low level record.");
System.err.println();
System.err.println(Main.USAGE + " import-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileInput>");
System.err.println();
for (int i = 0; i < 10; i++)
{

View File

@ -14,14 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
package org.apache.activemq.artemis.cli.commands.tools;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.List;
import io.airlift.command.Command;
import io.airlift.command.Option;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
@ -30,66 +33,69 @@ import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
import org.apache.activemq.artemis.utils.Base64;
/**
* Use this class to export the journal data. You can use it as a main class or through its static method {@link #exportJournal(String, String, String, int, int, String)}
* <p/>
* If you use the main method, use it as <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>
* <p/>
* Example: java -cp activemq-tools*-jar-with-dependencies.jar export-journal /journalDir activemq-data amq 2 10485760 /tmp/export.dat
*/
public class ExportJournal
@Command(name = "encode", description = "Encode a set of journal files into an internal encoded data format")
public class EncodeJournal implements Action
{
// Constants -----------------------------------------------------
@Option(name = "--directory", description = "The journal folder (default ../data/journal)")
public String directory = "../data/journal";
// Attributes ----------------------------------------------------
@Option(name = "--prefix", description = "The journal prefix (default activemq-datal)")
public String prefix = "activemq-data";
// Static --------------------------------------------------------
@Option(name = "--suffix", description = "The journal suffix (default amq)")
public String suffix = "amq";
// Constructors --------------------------------------------------
@Option(name = "--file-size", description = "The journal size (default 10485760)")
public int size = 10485760;
// Public --------------------------------------------------------
public void process(final String[] arg)
public Object execute(ActionContext context) throws Exception
{
if (arg.length != 6)
{
for (int i = 0; i < arg.length; i++)
{
System.out.println("arg[" + i + "] = " + arg[i]);
}
printUsage();
System.exit(-1);
}
try
{
exportJournal(arg[1], arg[2], arg[3], 2, Integer.parseInt(arg[4]), arg[5]);
exportJournal(directory, prefix, suffix, 2, size);
}
catch (Exception e)
{
e.printStackTrace();
}
return null;
}
public static void exportJournal(final String directory,
final String journalPrefix,
final String journalSuffix,
final int minFiles,
final int fileSize,
final String fileOutput) throws Exception
final int fileSize) throws Exception
{
FileOutputStream fileOut = new FileOutputStream(new File(fileOutput));
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, System.out);
}
public static void exportJournal(final String directory,
final String journalPrefix,
final String journalSuffix,
final int minFiles,
final int fileSize,
final String fileName) throws Exception
{
FileOutputStream fileOutputStream = new FileOutputStream(fileName);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
PrintStream out = new PrintStream(bufferedOutputStream);
try
{
exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
}
finally
{
out.close();
fileOutputStream.close();
}
PrintStream out = new PrintStream(buffOut);
exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
out.close();
}
public static void exportJournal(final String directory,
@ -212,7 +218,6 @@ public class ExportJournal
}
System.err.println("This method will export the journal at low level record.");
System.err.println();
System.err.println(Main.USAGE + " export-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
System.err.println();
for (int i = 0; i < 10; i++)
{

View File

@ -14,31 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.MessageLogger;
package org.apache.activemq.artemis.cli.commands.tools;
/**
* Logger Code 24
*
* each message id must be 6 digits long starting with 10, the 3rd digit donates the level so
*
* INF0 1
* WARN 2
* DEBUG 3
* ERROR 4
* TRACE 5
* FATAL 6
*
* so an INFO message would be 241000 to 241999
*/
@MessageLogger(projectCode = "AMQ")
public interface ActiveMQToolsLogger extends BasicLogger
import java.util.ArrayList;
import java.util.List;
import io.airlift.command.Help;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
public class HelpData extends Help implements Action
{
/**
* The default logger.
*/
ActiveMQToolsLogger LOGGER = Logger.getMessageLogger(ActiveMQToolsLogger.class, ActiveMQToolsLogger.class.getPackage().getName());
@Override
public Object execute(ActionContext context) throws Exception
{
List<String> commands = new ArrayList<>(1);
commands.add("data");
help(global, commands);
return null;
}
}

View File

@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
package org.apache.activemq.artemis.cli.commands.tools;
import java.util.ArrayList;
import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -27,15 +27,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import io.airlift.command.Command;
import io.airlift.command.Option;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
@ -47,23 +46,112 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.utils.ExecutorFactory;
public class PrintPages // NO_UCD (unused code)
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
public class PrintData implements Action
{
public static void printPages(String pageDirectory, String messagesDirectory)
@Option(name = "--bindings", description = "The folder used for bindings (default ../data/bindings)")
public String binding = "../data/bindings";
@Option(name = "--journal", description = "The folder used for messages journal (default ../data/journal)")
public String journal = "../data/journal";
@Option(name = "--paging", description = "The folder used for paging (default ../data/paging)")
public String paging = "../data/paging";
@Override
public Object execute(ActionContext context) throws Exception
{
printData(binding, journal, paging);
return null;
}
public static void printData(String bindingsDirectory, String messagesDirectory, String pagingDirectory)
{
File serverLockFile = new File(messagesDirectory, "server.lock");
if (serverLockFile.isFile())
{
try
{
FileLockNodeManager fileLock = new FileLockNodeManager(messagesDirectory, false);
fileLock.start();
System.out.println("********************************************");
System.out.println("Server's ID=" + fileLock.getNodeId().toString());
System.out.println("********************************************");
fileLock.stop();
}
catch (Exception e)
{
e.printStackTrace();
}
}
System.out.println("********************************************");
System.out.println("B I N D I N G S J O U R N A L");
System.out.println("********************************************");
try
{
DescribeJournal.describeBindingsJournal(bindingsDirectory);
}
catch (Exception e)
{
e.printStackTrace();
}
System.out.println();
System.out.println("********************************************");
System.out.println("M E S S A G E S J O U R N A L");
System.out.println("********************************************");
DescribeJournal describeJournal = null;
try
{
describeJournal = DescribeJournal.describeMessagesJournal(messagesDirectory);
}
catch (Exception e)
{
e.printStackTrace();
return;
}
try
{
System.out.println();
System.out.println("********************************************");
System.out.println("P A G I N G");
System.out.println("********************************************");
printPages(pagingDirectory, describeJournal);
}
catch (Exception e)
{
e.printStackTrace();
return;
}
}
private static void printPages(String pageDirectory, DescribeJournal describeJournal)
{
try
{
PageCursorsInfo cursorACKs = PrintPages.loadCursorACKs(messagesDirectory);
PageCursorsInfo cursorACKs = calculateCursorsInfo(describeJournal.getRecords());
Set<Long> pgTXs = cursorACKs.getPgTXs();
@ -166,6 +254,73 @@ public class PrintPages // NO_UCD (unused code)
}
}
/** Calculate the acks on the page system */
protected static PageCursorsInfo calculateCursorsInfo(List<RecordInfo> records) throws Exception
{
PageCursorsInfo cursorInfo = new PageCursorsInfo();
for (RecordInfo record : records)
{
byte[] data = record.data;
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
if (record.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR)
{
JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
encoding.decode(buff);
Set<PagePosition> set = cursorInfo.getCursorRecords().get(encoding.queueID);
if (set == null)
{
set = new HashSet<PagePosition>();
cursorInfo.getCursorRecords().put(encoding.queueID, set);
}
set.add(encoding.position);
}
else if (record.userRecordType == JournalRecordIds.PAGE_CURSOR_COMPLETE)
{
JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
encoding.decode(buff);
Long queueID = Long.valueOf(encoding.queueID);
Long pageNR = Long.valueOf(encoding.position.getPageNr());
if (!cursorInfo.getCompletePages(queueID).add(pageNR))
{
System.err.println("Page " + pageNR + " has been already set as complete on queue " + queueID);
}
}
else if (record.userRecordType == JournalRecordIds.PAGE_TRANSACTION)
{
if (record.isUpdate)
{
JournalStorageManager.PageUpdateTXEncoding pageUpdate = new JournalStorageManager.PageUpdateTXEncoding();
pageUpdate.decode(buff);
cursorInfo.getPgTXs().add(pageUpdate.pageTX);
}
else
{
PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
pageTransactionInfo.decode(buff);
pageTransactionInfo.setRecordID(record.id);
cursorInfo.getPgTXs().add(pageTransactionInfo.getTransactionID());
}
}
}
return cursorInfo;
}
private static class PageCursorsInfo
{
private final Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
@ -220,92 +375,5 @@ public class PrintPages // NO_UCD (unused code)
}
/**
* @param journalLocation
* @return
* @throws Exception
*/
protected static PageCursorsInfo loadCursorACKs(final String journalLocation) throws Exception
{
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation, null);
// Will use only default values. The load function should adapt to anything different
ConfigurationImpl defaultValues = new ConfigurationImpl();
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(),
defaultValues.getJournalMinFiles(),
0,
0,
messagesFF,
"activemq-data",
"amq",
1);
messagesJournal.start();
ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> txs = new ArrayList<PreparedTransactionInfo>();
messagesJournal.load(records, txs, null, false);
PageCursorsInfo cursorInfo = new PageCursorsInfo();
for (RecordInfo record : records)
{
byte[] data = record.data;
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
if (record.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR)
{
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
Set<PagePosition> set = cursorInfo.getCursorRecords().get(encoding.queueID);
if (set == null)
{
set = new HashSet<PagePosition>();
cursorInfo.getCursorRecords().put(encoding.queueID, set);
}
set.add(encoding.position);
}
else if (record.userRecordType == JournalRecordIds.PAGE_CURSOR_COMPLETE)
{
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
Long queueID = Long.valueOf(encoding.queueID);
Long pageNR = Long.valueOf(encoding.position.getPageNr());
if (!cursorInfo.getCompletePages(queueID).add(pageNR))
{
System.err.println("Page " + pageNR + " has been already set as complete on queue " + queueID);
}
}
else if (record.userRecordType == JournalRecordIds.PAGE_TRANSACTION)
{
if (record.isUpdate)
{
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
pageUpdate.decode(buff);
cursorInfo.getPgTXs().add(pageUpdate.pageTX);
}
else
{
PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
pageTransactionInfo.decode(buff);
pageTransactionInfo.setRecordID(record.id);
cursorInfo.getPgTXs().add(pageTransactionInfo.getTransactionID());
}
}
}
return cursorInfo;
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
package org.apache.activemq.artemis.cli.commands.tools;
/**
* The constants shared by <code>org.apache.activemq.tools.XmlDataImporter</code> and

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
package org.apache.activemq.artemis.cli.commands.tools;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
@ -36,6 +36,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import io.airlift.command.Command;
import io.airlift.command.Option;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -79,44 +83,36 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination;
import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ExecutorFactory;
/**
* Read the journal, page, and large-message data from a stopped instance of ActiveMQ and save it in an XML format to
* a file. It uses the StAX <code>javax.xml.stream.XMLStreamWriter</code> for speed and simplicity. Output can be
* read by <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataImporter</code>.
*/
public final class XmlDataExporter
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
public final class XmlDataExporter implements Action
{
// Constants -----------------------------------------------------
private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
// Attributes ----------------------------------------------------
private JournalStorageManager storageManager;
private final JournalStorageManager storageManager;
private Configuration config;
private final Configuration config;
private final XMLStreamWriter xmlWriter;
private XMLStreamWriter xmlWriter;
// an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
private final Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs;
private final Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new HashMap<>();
// map of all message records hashed by their record ID (which will match the record ID of the message refs)
private final HashMap<Long, Message> messages;
private final HashMap<Long, Message> messages = new HashMap<>();
private final Map<Long, Set<PagePosition>> cursorRecords;
private final Map<Long, Set<PagePosition>> cursorRecords = new HashMap<>();
private final Set<Long> pgTXs;
private final Set<Long> pgTXs = new HashSet<>();
final HashMap<Long, PersistentQueueBindingEncoding> queueBindings;
private final HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<>();
private final Map<String, PersistedConnectionFactory> jmsConnectionFactories = new ConcurrentHashMap<>();
@ -124,16 +120,31 @@ public final class XmlDataExporter
private final Map<Pair<PersistedType, String>, PersistedBindings> jmsJNDI = new ConcurrentHashMap<>();
@Option(name = "--bindings", description = "The folder used for bindings (default ../data/bindings)")
public String binding = "../data/bindings";
@Option(name = "--journal", description = "The folder used for messages journal (default ../data/journal)")
public String journal = "../data/journal";
@Option(name = "--paging", description = "The folder used for paging (default ../data/paging)")
public String paging = "../data/paging";
@Option(name = "--large-messages", description = "The folder used for large-messages (default ../data/largemessages)")
public String largeMessges = "../data/paging";
long messagesPrinted = 0L;
long bindingsPrinted = 0L;
// Static --------------------------------------------------------
@Override
public Object execute(ActionContext context) throws Exception
{
process(System.out, binding, journal, paging, largeMessges);
return null;
}
// Constructors --------------------------------------------------
public XmlDataExporter(OutputStream out, String bindingsDir, String journalDir, String pagingDir,
public void process(OutputStream out, String bindingsDir, String journalDir, String pagingDir,
String largeMessagesDir) throws Exception
{
config = new ConfigurationImpl()
@ -154,16 +165,6 @@ public final class XmlDataExporter
storageManager = new JournalStorageManager(config, executorFactory);
messageRefs = new HashMap<>();
messages = new HashMap<>();
cursorRecords = new HashMap<>();
pgTXs = new HashSet<>();
queueBindings = new HashMap<>();
XMLOutputFactory factory = XMLOutputFactory.newInstance();
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
@ -171,9 +172,11 @@ public final class XmlDataExporter
XMLStreamWriter.class.getClassLoader(),
new Class[]{XMLStreamWriter.class},
handler);
writeXMLData();
}
public void writeXMLData() throws Exception
private void writeXMLData() throws Exception
{
long start = System.currentTimeMillis();
getBindings();

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
package org.apache.activemq.artemis.cli.commands.tools;
import javax.xml.stream.XMLInputFactory;
@ -34,15 +34,20 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.airlift.command.Option;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@ -50,26 +55,27 @@ import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.UUIDGenerator;
/**
* Read XML output from <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and
* send the messages to a running instance of ActiveMQ. It uses the StAX <code>javax.xml.stream.XMLStreamReader</code>
* for speed and simplicity.
*/
public final class XmlDataImporter
@Command(name = "imp", description = "Import all message-data using an XML that could be interpreted by any system.")
public final class XmlDataImporter implements Action
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private final XMLStreamReader reader;
private XMLStreamReader reader;
// this session is really only needed if the "session" variable does not auto-commit sends
final ClientSession managementSession;
ClientSession managementSession;
final boolean localSession;
boolean localSession;
final Map<String, String> addressMap = new HashMap<>();
@ -77,13 +83,61 @@ public final class XmlDataImporter
String tempFileName = "";
private final ClientSession session;
private ClientSession session;
private boolean applicationServerCompatibility = false;
@Option(name = "--host", description = "The host used to import the data (default localhost)")
public String host = "localhost";
// Static --------------------------------------------------------
@Option(name = "--port", description = "The port used to import the data (default 61616)")
public int port = 61616;
// Constructors --------------------------------------------------
@Option(name = "--transaction", description = "If this is set to true you will need a whole transaction to commit at the end. (default false)")
public boolean transactional;
@Option(name = "--user", description = "User name used to import the data. (default null)")
public String user = null;
@Option(name = "--password", description = "User name used to import the data. (default null)")
public String password = null;
@Arguments(description = "The input file name (default=exp.dmp)", required = true)
public String input;
public String getPassword()
{
return password;
}
public void setPassword(String password)
{
this.password = password;
}
public String getUser()
{
return user;
}
public void setUser(String user)
{
this.user = user;
}
@Override
public Object execute(ActionContext context) throws Exception
{
process(input, host, port, transactional);
return null;
}
public void process(String inputFile, String host, int port, boolean transactional) throws Exception
{
this.process(new FileInputStream(inputFile), host, port, transactional);
}
/**
* This is the normal constructor for programmatic access to the
@ -96,59 +150,9 @@ public final class XmlDataImporter
* @param session used for sending messages, must use auto-commit for sends
* @throws Exception
*/
public XmlDataImporter(InputStream inputStream, ClientSession session) throws Exception
public void process(InputStream inputStream, ClientSession session) throws Exception
{
this(inputStream, session, null, false);
}
/**
* This is the normal constructor for programmatic access to the
* <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataImporter</code> if the session passed
* in uses auto-commit for sends.
* <p/>
* If the session needs to be transactional then use the constructor which takes 2 sessions.
*
* @param inputStream the stream from which to read the XML for import
* @param session used for sending messages, must use auto-commit for sends
* @param applicationServerCompatibility whether or not the JNDI entries for JMS connection factories and
* destinations should be compatible with JBoss AS 7.x, EAP 6.x,
* Wildfly, etc. which requires different bindings for local and remote
* clients
* @throws Exception
*/
public XmlDataImporter(InputStream inputStream, ClientSession session, boolean applicationServerCompatibility) throws Exception
{
this(inputStream, session, null, applicationServerCompatibility);
}
/**
* This is the constructor to use if you wish to import all messages transactionally.
* <p/>
* Pass in a session which doesn't use auto-commit for sends, and one that does (for management
* operations necessary during import).
*
* @param inputStream the stream from which to read the XML for import
* @param session used for sending messages, doesn't need to auto-commit sends
* @param managementSession used for management queries, must use auto-commit for sends
* @param applicationServerCompatibility whether or not the JNDI entries for JMS connection factories and
* destinations should be compatible with JBoss AS 7.x, EAP 6.x,
* Wildfly, etc. which requires different bindings for local and remote
* clients
*/
public XmlDataImporter(InputStream inputStream, ClientSession session, ClientSession managementSession, boolean applicationServerCompatibility) throws Exception
{
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
this.session = session;
if (managementSession != null)
{
this.managementSession = managementSession;
}
else
{
this.managementSession = session;
}
localSession = false;
this.applicationServerCompatibility = applicationServerCompatibility;
this.process(inputStream, session, null);
}
/**
@ -161,36 +165,53 @@ public final class XmlDataImporter
* @param session used for sending messages, doesn't need to auto-commit sends
* @param managementSession used for management queries, must use auto-commit for sends
*/
public XmlDataImporter(InputStream inputStream, ClientSession session, ClientSession managementSession) throws Exception
public void process(InputStream inputStream, ClientSession session, ClientSession managementSession) throws Exception
{
this(inputStream, session, managementSession, false);
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
this.session = session;
if (managementSession != null)
{
this.managementSession = managementSession;
}
else
{
this.managementSession = session;
}
localSession = false;
processXml();
}
public XmlDataImporter(InputStream inputStream, String host, String port, boolean transactional, boolean applicationServerCompatibility) throws Exception
public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception
{
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
HashMap<String, Object> connectionParams = new HashMap<>();
connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port));
ServerLocator serverLocator =
ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(
NettyConnectorFactory.class.getName(),
connectionParams));
ClientSessionFactory sf = serverLocator.createSessionFactory();
session = sf.createSession(false, !transactional, true);
managementSession = sf.createSession(false, true, true);
if (user != null || password != null)
{
session = sf.createSession(user, password, false, !transactional, true, false, 0);
managementSession = sf.createSession(user, password, false, true, true, false, 0);
}
else
{
session = sf.createSession(false, !transactional, true);
managementSession = sf.createSession(false, true, true);
}
localSession = true;
this.applicationServerCompatibility = applicationServerCompatibility;
processXml();
}
public XmlDataImporter(String inputFile, String host, String port, boolean transactional, boolean applicationServerCompatibility) throws Exception
{
this(new FileInputStream(inputFile), host, port, transactional, applicationServerCompatibility);
}
// Public --------------------------------------------------------
public void processXml() throws Exception
private void processXml() throws Exception
{
try
{
@ -1003,10 +1024,6 @@ public final class XmlDataImporter
{
String elementText = reader.getElementText();
entry.append(elementText).append(", ");
if (applicationServerCompatibility)
{
entry.append(XmlDataConstants.JNDI_COMPATIBILITY_PREFIX).append(elementText).append(", ");
}
ActiveMQServerLogger.LOGGER.debug("JMS admin object JNDI entry: " + entry.toString());
}
break;

View File

@ -59,7 +59,7 @@ public class FileBrokerTest
public void startWithoutJMS() throws Exception
{
ServerDTO serverDTO = new ServerDTO();
serverDTO.configuration = "activemq-configuration-nojms.xml";
serverDTO.configuration = "broker-nojms.xml";
FileBroker broker = null;
try
{

View File

@ -54,7 +54,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@ -67,11 +67,6 @@
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-tools</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-ra</artifactId>
@ -132,7 +127,6 @@
<artifactId>artemis-website</artifactId>
<version>${project.version}</version>
</dependency>
<!-- dependencies -->
<dependency>
<groupId>org.jboss.logmanager</groupId>

View File

@ -47,7 +47,7 @@
<includes>
<!-- modules -->
<include>org.apache.activemq:artemis-bootstrap</include>
<include>org.apache.activemq:artemis-cli</include>
<include>org.apache.activemq:artemis-commons</include>
<include>org.apache.activemq:artemis-core-client</include>
<include>org.apache.activemq:artemis-dto</include>
@ -63,7 +63,6 @@
<include>org.apache.activemq:artemis-selector</include>
<include>org.apache.activemq:artemis-server</include>
<include>org.apache.activemq:artemis-service-extensions</include>
<include>org.apache.activemq:artemis-tools</include>
<include>org.apache.activemq:artemis-web</include>
<include>org.apache.activemq.rest:artemis-rest</include>
<!-- dependencies -->

View File

@ -85,6 +85,26 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
*/
public final class DescribeJournal
{
private final List<RecordInfo> records;
private final List<PreparedTransactionInfo> preparedTransactions;
public DescribeJournal(List<RecordInfo> records, List<PreparedTransactionInfo> preparedTransactions)
{
this.records = records;
this.preparedTransactions = preparedTransactions;
}
public List<RecordInfo> getRecords()
{
return records;
}
public List<PreparedTransactionInfo> getPreparedTransactions()
{
return preparedTransactions;
}
public static void describeBindingsJournal(final String bindingsDir) throws Exception
{
@ -94,7 +114,7 @@ public final class DescribeJournal
describeJournal(bindingsFF, bindings, bindingsDir);
}
public static void describeMessagesJournal(final String messagesDir) throws Exception
public static DescribeJournal describeMessagesJournal(final String messagesDir) throws Exception
{
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null);
@ -111,7 +131,7 @@ public final class DescribeJournal
"amq",
1);
describeJournal(messagesFF, messagesJournal, messagesDir);
return describeJournal(messagesFF, messagesJournal, messagesDir);
}
/**
@ -119,7 +139,7 @@ public final class DescribeJournal
* @param journal
* @throws Exception
*/
private static void describeJournal(SequentialFileFactory fileFactory, JournalImpl journal, final String path) throws Exception
private static DescribeJournal describeJournal(SequentialFileFactory fileFactory, JournalImpl journal, final String path) throws Exception
{
List<JournalFile> files = journal.orderFiles();
@ -417,6 +437,8 @@ public final class DescribeJournal
}
journal.stop();
return new DescribeJournal(records, preparedTransactions);
}
protected static void printCounters(final PrintStream out, final Map<Long, PageSubscriptionCounterImpl> counters)

View File

@ -1,90 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-pom</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>artemis-tools</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis Tools</name>
<dependencies>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-server</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<properties>
<activemq.basedir>${project.basedir}/..</activemq.basedir>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.activemq.artemis.tools.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>release</id>
</profile>
</profiles>
</project>

View File

@ -1,231 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
import java.io.File;
import java.util.ArrayList;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
public class DataTool
{
private static final String BINDING_TYPE = "binding";
private static final String JOURNAL_TYPE = "journal";
private static final String JMS_TYPE = "jms";
private static final String ROLLBACK = "rollback";
private static final String DELETE = "delete";
public void process(String[] arg)
{
if (arg.length < 5)
{
printUsage();
System.exit(-1);
}
String type = arg[1];
String directoryName = arg[2];
String sizeStr = arg[3];
long sizeLong;
if (!type.equals(BINDING_TYPE) && !type.equals(JOURNAL_TYPE))
{
System.err.println("Invalid type: " + type);
printUsage();
System.exit(-1);
}
File directory = new File(directoryName);
if (!directory.exists() || !directory.isDirectory())
{
System.err.println("Invalid directory " + directoryName);
printUsage();
System.exit(-1);
}
try
{
sizeLong = Long.parseLong(sizeStr);
if (sizeLong <= 0)
{
System.err.println("Invalid size " + sizeLong);
printUsage();
System.exit(-1);
}
}
catch (Throwable e)
{
System.err.println("Error converting journal size: " + e.getMessage() + " couldn't convert size " + sizeStr);
printUsage();
System.exit(-1);
}
final String journalName;
final String exension;
if (type.equals(JOURNAL_TYPE))
{
journalName = "activemq-data";
exension = "amq";
}
else if (type.equals(BINDING_TYPE))
{
journalName = "activemq-bindings";
exension = "bindings";
}
else if (type.equals(JMS_TYPE))
{
journalName = "activemq-jms";
exension = "jms";
}
else
{
printUsage();
System.exit(-1);
return; // dumb compiler don't know System.exit interrupts the execution, some variables wouldn't be init
}
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(directoryName, null);
// Will use only default values. The load function should adapt to anything different
ConfigurationImpl defaultValues = new ConfigurationImpl();
try
{
ArrayList<Long> txsToRollback = new ArrayList<Long>();
ArrayList<Long> idsToDelete = new ArrayList<Long>();
ArrayList<Long> listInUse = null;
for (int i = 4; i < arg.length; i++)
{
String str = arg[i];
if (str.equals(DELETE))
{
listInUse = idsToDelete;
}
else if (str.equals(ROLLBACK))
{
listInUse = txsToRollback;
}
else
{
try
{
if (listInUse == null)
{
System.err.println("You must specify either " + DELETE + " or " + ROLLBACK + " as a command for the IDs you're using");
printUsage();
System.exit(-1);
}
long id = Long.parseLong(str);
listInUse.add(id);
}
catch (Throwable e)
{
System.err.println("Error converting id " + str + " as a recordID");
printUsage();
System.exit(-1);
}
}
}
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(),
defaultValues.getJournalMinFiles(),
0,
0,
messagesFF,
journalName,
exension,
1);
messagesJournal.start();
messagesJournal.loadInternalOnly();
for (long tx : txsToRollback)
{
System.out.println("Rolling back " + tx);
try
{
messagesJournal.appendRollbackRecord(tx, true);
}
catch (Throwable e)
{
e.printStackTrace();
}
}
for (long id : idsToDelete)
{
System.out.println("Deleting record " + id);
try
{
messagesJournal.appendDeleteRecord(id, true);
}
catch (Throwable e)
{
e.printStackTrace();
}
}
messagesJournal.stop();
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void printUsage()
{
for (int i = 0; i < 10; i++)
{
System.err.println();
}
System.err.println(Main.USAGE + " binding|journal <directory> <size> [rollback | delete] record1,record2..recordN");
System.err.println();
System.err.println("Example:");
System.err.println("say you wanted to rollback a prepared TXID=100, and you want to remove records 300, 301, 302:");
System.err.println(Main.USAGE + " journal /tmp/your-folder 10485760 rollback 100 delete 300 301 302");
System.err.println();
System.err.println(".. and you can specify as many rollback and delete you like");
for (int i = 0; i < 10; i++)
{
System.err.println();
}
}
}

View File

@ -1,127 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
public class Main
{
public static final String USAGE = "Use: java -jar " + getJarName();
private static final String IMPORT = "import";
private static final String EXPORT = "export";
private static final String PRINT_DATA = "print-data";
private static final String PRINT_PAGES = "print-pages";
private static final String DATA_TOOL = "data-tool";
private static final String TRANSFER = "transfer-queue";
private static final String EXPORT_JOURNAL = "export-journal";
private static final String IMPORT_JOURNAL = "import-journal";
private static final String OPTIONS = " [" + IMPORT + "|" + EXPORT + "|" + PRINT_DATA + "|" + PRINT_PAGES + "|" + DATA_TOOL + "|" + TRANSFER + "|" + EXPORT_JOURNAL + "|" + IMPORT_JOURNAL + "]";
public static void main(String[] arg) throws Exception
{
if (arg.length == 0)
{
System.out.println(USAGE + OPTIONS);
System.exit(-1);
}
if (IMPORT_JOURNAL.equals(arg[0]))
{
ImportJournal tool = new ImportJournal();
tool.process(arg);
}
else if (EXPORT_JOURNAL.equals(arg[0]))
{
ExportJournal tool = new ExportJournal();
tool.process(arg);
}
else if (TRANSFER.equals(arg[0]))
{
TransferQueue tool = new TransferQueue();
tool.process(arg);
}
else if (DATA_TOOL.equals(arg[0]))
{
DataTool dataTool = new DataTool();
dataTool.process(arg);
}
else if (EXPORT.equals(arg[0]))
{
if (arg.length != 5)
{
System.out.println(USAGE + " " + EXPORT + " <bindings-directory> <journal-directory> <paging-directory> <large-messages-directory>");
System.exit(-1);
}
else
{
XmlDataExporter xmlDataExporter = new XmlDataExporter(System.out, arg[1], arg[2], arg[3], arg[4]);
xmlDataExporter.writeXMLData();
}
}
else if (IMPORT.equals(arg[0]))
{
if (arg.length != 6)
{
System.out.println(USAGE + " " + IMPORT + " <input-file> <host> <port> <transactional> <application-server-compatibility>");
System.exit(-1);
}
else
{
XmlDataImporter xmlDataImporter = new XmlDataImporter(arg[1], arg[2], arg[3], Boolean.parseBoolean(arg[4]), Boolean.parseBoolean(arg[5]));
xmlDataImporter.processXml();
}
}
else if (PRINT_DATA.equals(arg[0]))
{
if (arg.length != 3)
{
System.err.println(USAGE + " " + PRINT_DATA + " <bindings-directory> <journal-directory>");
System.exit(-1);
}
PrintData.printData(arg[1], arg[2]);
}
else if (PRINT_PAGES.equals(arg[0]))
{
if (arg.length != 3)
{
System.err.println(USAGE + " " + PRINT_PAGES + " <paging-directory> <journal-directory>");
System.exit(-1);
}
PrintPages.printPages(arg[1], arg[2]);
}
else
{
System.out.println(USAGE + OPTIONS);
}
}
protected static String getJarName()
{
try
{
Class klass = Main.class;
String url = klass.getResource('/' + klass.getName().replace('.', '/') + ".class").toString();
String jarName = url.substring(0, url.lastIndexOf('!'));
return jarName.substring(jarName.lastIndexOf('/') + 1);
}
catch (Throwable e)
{
return "tool-jar-name.jar";
}
}
}

View File

@ -1,84 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
import java.io.File;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
/**
* Writes a human-readable interpretation of the contents of a ActiveMQ {@link org.apache.activemq.artemis.core.journal.Journal}.
* <p>
* To run this class with Maven, use:
*
* <pre>
* cd activemq-server
* mvn -q exec:java -Dexec.args="/foo/activemq/bindings /foo/activemq/journal" -Dexec.mainClass="org.apache.activemq.tools.PrintData"
* </pre>
*/
public class PrintData // NO_UCD (unused code)
{
protected static void printData(String bindingsDirectory, String messagesDirectory)
{
File serverLockFile = new File(messagesDirectory, "server.lock");
if (serverLockFile.isFile())
{
try
{
FileLockNodeManager fileLock = new FileLockNodeManager(messagesDirectory, false);
fileLock.start();
System.out.println("********************************************");
System.out.println("Server's ID=" + fileLock.getNodeId().toString());
System.out.println("********************************************");
fileLock.stop();
}
catch (Exception e)
{
e.printStackTrace();
}
}
System.out.println("********************************************");
System.out.println("B I N D I N G S J O U R N A L");
System.out.println("********************************************");
try
{
DescribeJournal.describeBindingsJournal(bindingsDirectory);
}
catch (Exception e)
{
e.printStackTrace();
}
System.out.println("********************************************");
System.out.println("M E S S A G E S J O U R N A L");
System.out.println("********************************************");
try
{
DescribeJournal.describeMessagesJournal(messagesDirectory);
}
catch (Exception e)
{
e.printStackTrace();
}
}
}

View File

@ -1,242 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tools;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
public class TransferQueue // NO_UCD (unused code)
{
public void process(String[] arg)
{
if (arg.length != 13 && arg.length != 14)
{
System.out.println("Invalid number of arguments! " + arg.length);
printUsage();
System.exit(-1);
}
String sourceHost;
String sourceUser;
String sourcePass;
int sourcePort;
String queue;
String targetHost;
int targetPort;
String targetUser;
String targetPassword;
String producingAddress;
int commit;
int waitTimeout;
String filter = null;
try
{
sourceHost = arg[1];
sourcePort = Integer.parseInt(arg[2]);
sourceUser = arg[3];
sourcePass = arg[4];
queue = arg[5];
targetHost = arg[6];
targetPort = Integer.parseInt(arg[7]);
targetUser = arg[8];
targetPassword = arg[9];
producingAddress = arg[10];
waitTimeout = Integer.parseInt(arg[11]);
commit = Integer.parseInt(arg[12]);
if (arg.length == 14)
{
filter = arg[13];
}
}
catch (Exception e)
{
e.printStackTrace();
printUsage();
System.exit(-1);
return; // the compiler doesn't understand exit as leaving the VM
}
Map<String, Object> sourceParameters = new HashMap<String, Object>();
sourceParameters.put(TransportConstants.HOST_PROP_NAME, sourceHost);
sourceParameters.put(TransportConstants.PORT_PROP_NAME, sourcePort);
Map<String, Object> targetParameters = new HashMap<String, Object>();
sourceParameters.put(TransportConstants.HOST_PROP_NAME, targetHost);
sourceParameters.put(TransportConstants.PORT_PROP_NAME, targetPort);
try
{
TransportConfiguration configurationSource = new TransportConfiguration(NettyConnectorFactory.class.getName(), sourceParameters);
ServerLocator locatorSource = ActiveMQClient.createServerLocator(false, configurationSource);
ClientSessionFactory factorySource = locatorSource.createSessionFactory();
ClientSession sessionSource = factorySource.createSession(sourceUser, sourcePass, false, false, false, false, 0);
ClientConsumer consumer;
if (filter == null)
{
consumer = sessionSource.createConsumer(queue);
}
else
{
consumer = sessionSource.createConsumer(queue, filter);
}
TransportConfiguration configurationTarget = new TransportConfiguration(NettyConnectorFactory.class.getName(), targetParameters);
ServerLocator locatorTarget = ActiveMQClient.createServerLocatorWithoutHA(configurationTarget);
ClientSessionFactory factoryTarget = locatorTarget.createSessionFactory();
ClientSession sessionTarget = factoryTarget.createSession(targetUser, targetPassword, false, false, false, false, 0);
ClientProducer producer = sessionTarget.createProducer(producingAddress);
sessionSource.start();
int countMessage = 0;
while (true)
{
ClientMessage message = consumer.receive(waitTimeout);
if (message == null)
{
break;
}
message.acknowledge();
if (!message.containsProperty("_AMQ_TOOL_original_address"))
{
message.putStringProperty("_AMQ_TOOL_original_address", message.getAddress().toString());
}
LinkedList<String> listToRemove = new LinkedList<String>();
for (SimpleString name : message.getPropertyNames())
{
if (name.toString().startsWith("_AMQ_ROUTE_TO"))
{
listToRemove.add(name.toString());
}
}
for (String str: listToRemove)
{
message.removeProperty(str);
}
producer.send(message);
if (countMessage++ % commit == 0)
{
System.out.println("Sent " + countMessage + " messages");
sessionTarget.commit();
sessionSource.commit();
}
}
sessionTarget.commit();
sessionSource.commit();
consumer.close();
producer.close();
sessionSource.close();
sessionTarget.close();
locatorSource.close();
locatorTarget.close();
}
catch (Exception e)
{
e.printStackTrace();
printUsage();
System.exit(-1);
}
}
public void printUsage()
{
for (int i = 0; i < 10; i++)
{
System.err.println();
}
System.err.println("This method will transfer messages from one queue into another, while removing internal properties such as ROUTE_TO.");
System.err.println();
System.err.println(Main.USAGE + " <source-IP> <source-port> <user> <password> <source-queue> <target-IP> <target-port> <user> <password> <target-address> <wait-timeout> <commit-size> [filter]");
System.err.println();
System.err.println("source-host: IP/hostname for the originating server for the messages");
System.err.println("source-port: port for the originating server for the messages");
System.err.println("user: Username used to connect to the source");
System.err.println("password: Password used to connect to the source");
System.err.println("source-port: port for the originating server for the messages");
System.err.println("source-queue: originating queue for the messages");
System.err.println();
System.err.println("target-host: IP/hostname for the destination server for the messages");
System.err.println("target-port: port for the destination server for the messages");
System.err.println("user: Username used to connect to the target");
System.err.println("password: Password used to connect to the target");
System.err.println("target-address: address at the destination server");
System.err.println();
System.err.println("wait-timeout: time in milliseconds");
System.err.println("commit-size: batch size for each transaction (in number of messages)");
System.err.println();
System.err.println("filter: You can optionally add a filter to the original queue");
for (int i = 0; i < 10; i++)
{
System.err.println();
}
}
}

View File

@ -39,7 +39,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -1,94 +1,145 @@
# Tools
Apache ActiveMQ Artemis ships with several helpful command line tools. All tools are
available from the activemq-tools-\<version\>-jar-with-dependencies.jar.
As the name suggests, this Java archive contains ActiveMQ along with all
of its dependencies. This is done to simplify the execution of the tools
by eliminating the need so specify a classpath. These tools are:
- **`print-data`**. Used for low-level inspection of the bindings and
message journals. It takes two parameters - `bindings-directory` and
`journal-directory`. These are the paths to the directories where
the bindings and message journals are stored, respectively. For
example:
You can use the artemis cli interface to execute data maintenance tools:
This is a list of sub-commands available
Name | Description
:--- | :---
exp | Export the message data using a special and independent XML format
imp | Imports the journal to a running broker using the output from expt
data | Prints a report about journal records and summary of existent records, as well a report on paging
encode | shows an internal format of the journal encoded to String
decode | imports the internal journal format from encode
You can use the help at the tool for more information on how to execute each of the tools. For example:
```
$ ./artemis help data print
NAME
artemis data print - Print data records information (WARNING: don't use
while a production server is running)
SYNOPSIS
artemis data print [--bindings <binding>] [--journal <journal>]
[--paging <paging>]
OPTIONS
--bindings <binding>
The folder used for bindings (default ../data/bindings)
--journal <journal>
The folder used for messages journal (default ../data/journal)
--paging <paging>
The folder used for paging (default ../data/paging)
```sh
java -jar activemq-tools-<version>-jar-with-dependencies.jar print-data /home/user/activemq/data/bindings /home/user/activemq/data/journal
```
- **`print-pages`**. Used for low-level inspection of paged message
data. It takes two parameters - `paging-directory` and
`journal-directory`. These are the paths to the directories where
paged messages and the message journals are stored, respectively.
For example:
```sh
java -jar activemq-tools-<version>-jar-with-dependencies.jar print-pages /home/user/activemq/data/paging-directory /home/user/activemq/data/journal
For a full list of data tools commands available use:
```
$ ./artemis help data
NAME
artemis data - data tools like (print|exp|imp|exp|encode|decode)
(example ./artemis data print)
- **`export`**. Used for exporting all binding and message data
(including paged and large messages) as well as JMS destinations and
connection factories (including JNDI bindings). The export is
structured as XML. This data can then be imported to another server
even if the server is a different version than the original. It
takes 4 parameters:
SYNOPSIS
artemis data
artemis data decode [--prefix <prefix>] [--directory <directory>]
[--suffix <suffix>] [--file-size <size>]
artemis data encode [--prefix <prefix>] [--directory <directory>]
[--suffix <suffix>] [--file-size <size>]
artemis data exp [--bindings <binding>]
[--large-messages <largeMessges>] [--paging <paging>]
[--journal <journal>]
artemis data imp [--password <password>] [--port <port>] [--host <host>]
[--user <user>] [--transaction]
artemis data print [--bindings <binding>] [--paging <paging>]
[--journal <journal>]
- `bindings-directory` - the path to the bindings directory.
COMMANDS
With no arguments, Display help information
- `journal-directory` - the path to the journal directory.
print
Print data records information (WARNING: don't use while a
production server is running)
- `paging-directory` - the path to the paging directory.
With --bindings option, The folder used for bindings (default
../data/bindings)
- `large-messages-directory` - the path to the large-messages
directory.
With --paging option, The folder used for paging (default
../data/paging)
Here's an example:
```
java -jar activemq-tools-<version>-jar-with-dependencies.jar export /home/user/activemq/data/bindings-directory /home/user/activemq/data/journal-directory /home/user/activemq/data/paging-directory /home/user/activemq/data/large-messages
```
With --journal option, The folder used for messages journal (default
../data/journal)
This tool will export directly to standard out so if the data needs
to be stored in a file please redirect as appropriate for the
operation system in use. Also, please note that the `export` tool is
single threaded so depending on the size of the journal it could
take awhile to complete.
exp
Export all message-data using an XML that could be interpreted by
any system.
- **`import`**. Used for importing data from an XML document generated
by the `export` tool. The `import` tool reads the XML document and
connects to an Apache ActiveMQ Artemis server via Netty to import all the data. It
takes 5 parameters:
With --bindings option, The folder used for bindings (default
../data/bindings)
- `input-file` - the path to the XML file generated by the
`export` tool.
With --large-messages option, The folder used for large-messages
(default ../data/largemessages)
- `host` - the IP address or hostname of the server where the data
should be imported.
With --paging option, The folder used for paging (default
../data/paging)
- `port` - the port where ActiveMQ is listening.
With --journal option, The folder used for messages journal (default
../data/journal)
- `transactional` - a `boolean` flag to indicate whether or not to
send all the *message* data in a single transaction. Valid
values are `true` or `false`.
imp
Import all message-data using an XML that could be interpreted by
any system.
- `application-server-compatibility` - a `boolean` flag to
indicate whether or not JNDI bindings need special treatment to
account for the way JBoss AS7, Wildfly, and JBoss EAP 6 handle
JNDI for remote clients. Each of these application servers
require a special JNDI binding to allow access from remote
clients. If this is `true` then every JNDI binding in the XML
will be duplicated in the "java:jboss/exported/" namespace thus
allowing both local and remote clients to use the same name when
accessing resources via JNDI. Valid values are `true` or
`false`.
With --password option, User name used to import the data. (default
null)
Here's an example:
With --port option, The port used to import the data (default 61616)
```sh
java -jar activemq-tools-<version>-jar-with-dependencies.jar import /home/user/exportData.xml 127.0.0.1 61616 false false
```
With --host option, The host used to import the data (default
localhost)
Like the `export` tool the `import` tool is single threaded so
depending on the size of the XML file it may take awhile for the
process to complete.
With --user option, User name used to import the data. (default
null)
With --transaction option, If this is set to true you will need a
whole transaction to commit at the end. (default false)
decode
Decode a journal's internal format into a new journal set of files
With --prefix option, The journal prefix (default activemq-datal)
With --directory option, The journal folder (default
../data/journal)
With --suffix option, The journal suffix (default amq)
With --file-size option, The journal size (default 10485760)
encode
Encode a set of journal files into an internal encoded data format
With --prefix option, The journal prefix (default activemq-datal)
With --directory option, The journal folder (default
../data/journal)
With --suffix option, The journal suffix (default amq)
With --file-size option, The journal size (default 10485760)
```

15
pom.xml
View File

@ -489,7 +489,7 @@
<module>artemis-dto</module>
<module>artemis-web</module>
<module>artemis-website</module>
<module>artemis-bootstrap</module>
<module>artemis-cli</module>
<module>artemis-commons</module>
<module>artemis-selector</module>
<module>artemis-core-client</module>
@ -500,7 +500,6 @@
<module>artemis-journal</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-tools</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
@ -516,7 +515,7 @@
<module>artemis-dto</module>
<module>artemis-web</module>
<module>artemis-website</module>
<module>artemis-bootstrap</module>
<module>artemis-cli</module>
<module>artemis-commons</module>
<module>artemis-selector</module>
<module>artemis-core-client</module>
@ -527,7 +526,6 @@
<module>artemis-journal</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-tools</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
@ -546,7 +544,7 @@
<module>artemis-dto</module>
<module>artemis-web</module>
<module>artemis-website</module>
<module>artemis-bootstrap</module>
<module>artemis-cli</module>
<module>artemis-commons</module>
<module>artemis-selector</module>
<module>artemis-core-client</module>
@ -557,7 +555,6 @@
<module>artemis-journal</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-tools</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
@ -586,7 +583,7 @@
<modules>
<module>artemis-dto</module>
<module>artemis-web</module>
<module>artemis-bootstrap</module>
<module>artemis-cli</module>
<module>artemis-commons</module>
<module>artemis-selector</module>
<module>artemis-core-client</module>
@ -597,7 +594,6 @@
<module>artemis-journal</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-tools</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
@ -620,7 +616,7 @@
<modules>
<module>artemis-dto</module>
<module>artemis-web</module>
<module>artemis-bootstrap</module>
<module>artemis-cli</module>
<module>artemis-commons</module>
<module>artemis-selector</module>
<module>artemis-core-client</module>
@ -631,7 +627,6 @@
<module>artemis-journal</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-tools</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>

View File

@ -124,7 +124,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -78,7 +78,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@ -88,7 +88,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-tools</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -49,7 +49,6 @@ import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.apache.activemq.artemis.tests.util.UnitTestCase;
import org.apache.activemq.artemis.tools.ExportJournal;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@ -1428,8 +1427,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase
journal.forceMoveNextFile();
journal.checkReclaimStatus();
ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out4.dmp");
journal.testCompact();
stopJournal();

View File

@ -16,11 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.tools.ExportJournal;
import org.apache.activemq.artemis.tools.ImportJournal;
import org.junit.Ignore;
import org.junit.Test;
import java.io.StringReader;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -29,8 +24,12 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal;
import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ServiceTestBase;
import org.junit.Ignore;
import org.junit.Test;
public class ExportFormatTest extends ServiceTestBase
{
@ -113,19 +112,19 @@ public class ExportFormatTest extends ServiceTestBase
System.out.println();
System.out.println("copy & paste the following as bindingsFile:");
ExportJournal.exportJournal(getBindingsDir(), "activemq-bindings", "bindings", 2, 1048576, System.out);
EncodeJournal.exportJournal(getBindingsDir(), "activemq-bindings", "bindings", 2, 1048576, System.out);
System.out.println();
System.out.println("copy & paste the following as dataFile:");
ExportJournal.exportJournal(getJournalDir(), "activemq-data", "amq", 2, 102400, System.out);
EncodeJournal.exportJournal(getJournalDir(), "activemq-data", "amq", 2, 102400, System.out);
}
@Test
public void testConsumeFromFormat() throws Exception
{
ImportJournal.importJournal(getJournalDir(), "activemq-data", "amq", 2, 102400, new StringReader(journalFile));
ImportJournal.importJournal(getBindingsDir(),
DecodeJournal.importJournal(getJournalDir(), "activemq-data", "amq", 2, 102400, new StringReader(journalFile));
DecodeJournal.importJournal(getBindingsDir(),
"activemq-bindings",
"bindings",
2,

View File

@ -16,15 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.unit.util.InVMContext;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
@ -32,29 +29,29 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.tools.XmlDataConstants;
import org.apache.activemq.artemis.tools.XmlDataExporter;
import org.apache.activemq.artemis.tools.XmlDataImporter;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.unit.util.InVMContext;
import org.apache.activemq.artemis.tests.util.ServiceTestBase;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.junit.Test;
/**
* A test of the XML export/import functionality
@ -114,8 +111,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -125,8 +122,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -209,8 +206,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -220,8 +217,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -263,8 +260,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -274,8 +271,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -299,8 +296,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -310,8 +307,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1"));
@ -411,8 +408,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -422,8 +419,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ConnectionFactory cf1 = (ConnectionFactory) namingContext.lookup(jndi_binding1);
assertNotNull(cf1);
@ -479,8 +476,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -490,8 +487,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
assertNotNull(namingContext.lookup("myQueueJndiBinding1"));
@ -518,58 +515,6 @@ public class XmlImportExportTest extends ServiceTestBase
connection.close();
}
@Test
public void testJmsDestinationWithAppServerCompatibility() throws Exception
{
ClientSession session = basicSetUp();
jmsServer.createQueue(true, "myQueue", null, true, "myQueueJndiBinding1", "myQueueJndiBinding2");
jmsServer.createTopic(true, "myTopic", "myTopicJndiBinding1", "myTopicJndiBinding2");
session.close();
locator.close();
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
session = basicSetUp();
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session, true);
xmlDataImporter.processXml();
assertNotNull(namingContext.lookup("myQueueJndiBinding1"));
assertNotNull(namingContext.lookup(XmlDataConstants.JNDI_COMPATIBILITY_PREFIX + "myQueueJndiBinding1"));
assertNotNull(namingContext.lookup("myQueueJndiBinding2"));
assertNotNull(namingContext.lookup(XmlDataConstants.JNDI_COMPATIBILITY_PREFIX + "myQueueJndiBinding2"));
assertNotNull(namingContext.lookup("myTopicJndiBinding1"));
assertNotNull(namingContext.lookup(XmlDataConstants.JNDI_COMPATIBILITY_PREFIX + "myTopicJndiBinding1"));
assertNotNull(namingContext.lookup("myTopicJndiBinding2"));
assertNotNull(namingContext.lookup(XmlDataConstants.JNDI_COMPATIBILITY_PREFIX + "myTopicJndiBinding2"));
jmsServer.createConnectionFactory("test-cf", false, JMSFactoryType.CF, Arrays.asList("in-vm1"), "test-cf");
ConnectionFactory cf = (ConnectionFactory) namingContext.lookup("test-cf");
Connection connection = cf.createConnection();
Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = jmsSession.createProducer((Destination) namingContext.lookup("myQueueJndiBinding1"));
producer.send(jmsSession.createTextMessage());
MessageConsumer consumer = jmsSession.createConsumer((Destination) namingContext.lookup(XmlDataConstants.JNDI_COMPATIBILITY_PREFIX + "myQueueJndiBinding1"));
connection.start();
assertNotNull(consumer.receive(3000));
consumer = jmsSession.createConsumer((Destination) namingContext.lookup("myTopicJndiBinding1"));
producer = jmsSession.createProducer((Destination) namingContext.lookup(XmlDataConstants.JNDI_COMPATIBILITY_PREFIX + "myTopicJndiBinding1"));
producer.send(jmsSession.createTextMessage());
assertNotNull(consumer.receive(3000));
connection.close();
}
@Test
public void testLargeMessage() throws Exception
{
@ -608,8 +553,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -619,8 +564,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
session.close();
session = factory.createSession(false, false);
session.start();
@ -667,8 +612,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -678,8 +623,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
consumer = session.createConsumer("myQueue1");
session.start();
msg = consumer.receive(CONSUMER_TIMEOUT);
@ -736,8 +681,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -747,8 +692,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
@ -807,8 +752,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -818,8 +763,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
@ -892,8 +837,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
//System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -903,8 +848,8 @@ public class XmlImportExportTest extends ServiceTestBase
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
@ -951,8 +896,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -963,8 +908,8 @@ public class XmlImportExportTest extends ServiceTestBase
ClientSession managementSession = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session, managementSession);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session, managementSession);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -995,8 +940,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -1007,8 +952,8 @@ public class XmlImportExportTest extends ServiceTestBase
ClientSession managementSession = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session, managementSession);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session, managementSession);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -1050,8 +995,8 @@ public class XmlImportExportTest extends ServiceTestBase
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
@ -1062,8 +1007,8 @@ public class XmlImportExportTest extends ServiceTestBase
ClientSession managementSession = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session, managementSession);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session, managementSession);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();

View File

@ -1,433 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.tools;
import java.util.ArrayList;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.tools.Main;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TransferMessageTest extends ClusterTestBase
{
public static final int NUM_MESSAGES = 2000;
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
setupServers();
}
@Override
@After
public void tearDown() throws Exception
{
stopServers();
super.tearDown();
}
protected boolean isNetty()
{
return true;
}
public void setupServers() throws Exception
{
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupServer(2, isFileStorage(), isNetty());
setupServer(3, isFileStorage(), isNetty());
setupServer(4, isFileStorage(), isNetty());
}
@Test
public void testFreezeMessages() throws Throwable
{
try
{
setupCluster();
startServers();
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
setupSessionFactory(3, isNetty());
setupSessionFactory(4, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
createQueue(1, "queues.testaddress", "queue0", null, true);
createQueue(2, "queues.testaddress", "queue0", null, true);
createQueue(3, "queues.testaddress", "queue0", null, true);
createQueue(4, "queues.testaddress", "queue0", null, true);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
addConsumer(2, 2, "queue0", null);
addConsumer(3, 3, "queue0", null);
addConsumer(4, 4, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 4, 4, false);
waitForBindings(2, "queues.testaddress", 4, 4, false);
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
PostOfficeImpl postOffice = (PostOfficeImpl) servers[0].getPostOffice();
ArrayList<String> queuesToTransfer = new ArrayList<String>();
// System.out.println("bindings = " + postOffice.getAddressManager().getBindings().size());
for (Map.Entry<SimpleString, Binding> entry : postOffice.getAddressManager().getBindings().entrySet())
{
// System.out.println("entry: " + entry + " / " + entry.getValue() + " class = " + entry.getValue().getClass());
if (entry.getValue() instanceof LocalQueueBinding)
{
LocalQueueBinding localQueueBinding = (LocalQueueBinding) entry.getValue();
if (localQueueBinding.getBindable() instanceof QueueImpl)
{
QueueImpl queue = (QueueImpl) localQueueBinding.getBindable();
for (Consumer consumer : queue.getConsumers())
{
if (consumer instanceof ClusterConnectionBridge)
{
queuesToTransfer.add(entry.getKey().toString());
// System.out.println("Removing bridge from consumers, so messages should get stuck");
queue.removeConsumer(consumer);
}
}
}
}
}
consumers[0].getConsumer().close();
send(0, "queues.testaddress", NUM_MESSAGES, true, null);
createQueue(0, "output-result", "output-result", null, true);
queuesToTransfer.add("queue0");
closeAllConsumers();
for (String str : queuesToTransfer)
{
callTransfer("transfer-queue",
"127.0.0.1", "" + TransportConstants.DEFAULT_PORT, "guest", "guest",
str,
"127.0.0.1", "" + TransportConstants.DEFAULT_PORT, "guest", "guest",
"output-result",
"500", "100");
}
ClientSession session = sfs[0].createSession(false, false);
ClientConsumer consumer = session.createConsumer("output-result");
session.start();
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage msg = consumer.receive(5000);
Assert.assertNotNull(msg);
msg.acknowledge();
if (i % 100 == 0)
{
session.commit();
}
}
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.close();
}
catch (Throwable e)
{
throw e;
}
}
public static void callTransfer(String... args) throws Exception
{
Main.main(args);
}
@Test
public void testFreezeMessagesWithFilter() throws Throwable
{
try
{
setupCluster();
startServers();
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
setupSessionFactory(3, isNetty());
setupSessionFactory(4, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
createQueue(1, "queues.testaddress", "queue0", null, true);
createQueue(2, "queues.testaddress", "queue0", null, true);
createQueue(3, "queues.testaddress", "queue0", null, true);
createQueue(4, "queues.testaddress", "queue0", null, true);
createQueue(0, "queues2.testaddress", "queue2", null, true);
createQueue(1, "queues2.testaddress", "queue2", null, true);
createQueue(2, "queues2.testaddress", "queue2", null, true);
createQueue(3, "queues2.testaddress", "queue2", null, true);
createQueue(4, "queues2.testaddress", "queue2", null, true);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
addConsumer(2, 2, "queue0", null);
addConsumer(3, 3, "queue0", null);
addConsumer(4, 4, "queue0", null);
addConsumer(5, 0, "queue2", null);
addConsumer(6, 1, "queue2", null);
addConsumer(7, 2, "queue2", null);
addConsumer(8, 3, "queue2", null);
addConsumer(9, 4, "queue2", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 4, 4, false);
waitForBindings(2, "queues.testaddress", 4, 4, false);
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
waitForBindings(0, "queues2.testaddress", 1, 1, true);
waitForBindings(1, "queues2.testaddress", 1, 1, true);
waitForBindings(2, "queues2.testaddress", 1, 1, true);
waitForBindings(3, "queues2.testaddress", 1, 1, true);
waitForBindings(4, "queues2.testaddress", 1, 1, true);
waitForBindings(0, "queues2.testaddress", 4, 4, false);
waitForBindings(1, "queues2.testaddress", 4, 4, false);
waitForBindings(2, "queues2.testaddress", 4, 4, false);
waitForBindings(3, "queues2.testaddress", 4, 4, false);
waitForBindings(4, "queues2.testaddress", 4, 4, false);
PostOfficeImpl postOffice = (PostOfficeImpl) servers[0].getPostOffice();
ArrayList<String> queuesToTransfer = new ArrayList<String>();
// System.out.println("bindings = " + postOffice.getAddressManager().getBindings().size());
for (Map.Entry<SimpleString, Binding> entry : postOffice.getAddressManager().getBindings().entrySet())
{
// System.out.println("entry: " + entry + " / " + entry.getValue() + " class = " + entry.getValue().getClass());
if (entry.getValue() instanceof LocalQueueBinding)
{
LocalQueueBinding localQueueBinding = (LocalQueueBinding) entry.getValue();
if (localQueueBinding.getBindable() instanceof QueueImpl)
{
QueueImpl queue = (QueueImpl) localQueueBinding.getBindable();
for (Consumer consumer : queue.getConsumers())
{
if (consumer instanceof ClusterConnectionBridge)
{
queuesToTransfer.add(entry.getKey().toString());
// System.out.println("Removing bridge from consumers, so messages should get stuck");
queue.removeConsumer(consumer);
}
}
}
}
}
closeAllConsumers();
send(0, "queues.testaddress", NUM_MESSAGES, true, null);
send(0, "queues2.testaddress", 1000, true, null);
createQueue(0, "tmp-queue", "tmp-queue", null, true);
queuesToTransfer.add("queue0");
queuesToTransfer.add("queue2");
for (String str : queuesToTransfer)
{
callTransfer("transfer-queue",
"127.0.0.1", "" + TransportConstants.DEFAULT_PORT, "guest", "guest",
str,
"127.0.0.1", "" + TransportConstants.DEFAULT_PORT, "guest", "guest",
"tmp-queue",
"500", "100");
}
createQueue(0, "output-result", "output-result", null, true);
System.out.println("Transferring the main output-queue now");
callTransfer("transfer-queue",
"127.0.0.1", "" + TransportConstants.DEFAULT_PORT, "guest", "guest",
"tmp-queue",
"127.0.0.1", "" + TransportConstants.DEFAULT_PORT, "guest", "guest",
"output-result",
"500", "100",
"_AMQ_TOOL_original_address='queues.testaddress'");
ClientSession session = sfs[0].createSession(false, false);
ClientConsumer consumer = session.createConsumer("output-result");
session.start();
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage msg = consumer.receive(5000);
Assert.assertNotNull(msg);
msg.acknowledge();
if (i % 100 == 0)
{
session.commit();
}
}
Assert.assertNull(consumer.receiveImmediate());
session.commit();
stopServers(1, 2, 3, 4);
System.out.println("Last transfer!!!");
callTransfer("transfer-queue",
"127.0.0.1", "" + TransportConstants.DEFAULT_PORT, "guest", "guest",
"tmp-queue",
"127.0.0.1", "" + TransportConstants.DEFAULT_PORT, "guest", "guest",
"output-result",
"500", "100",
"_AMQ_TOOL_original_address='queues2.testaddress'");
session.start();
for (int i = 0; i < 1000; i++)
{
ClientMessage msg = consumer.receive(5000);
Assert.assertNotNull(msg);
msg.acknowledge();
if (i % 100 == 0)
{
session.commit();
}
}
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.close();
}
catch (Throwable e)
{
throw e;
}
}
protected void setupCluster() throws Exception
{
setupCluster(false);
}
protected void startServers() throws Exception
{
startServers(0, 1, 2, 3, 4);
}
protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
{
setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4);
setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4);
setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4);
setupClusterConnection("cluster3", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4);
setupClusterConnection("cluster4", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3);
}
}

View File

@ -74,7 +74,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -46,7 +46,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -72,7 +72,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -72,7 +72,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -79,7 +79,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -16,24 +16,23 @@
*/
package org.apache.activemq.artemis.tests.stress.journal;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ServiceTestBase;
import org.junit.Test;
import org.apache.activemq.artemis.tools.XmlDataExporter;
import org.apache.activemq.artemis.tools.XmlDataImporter;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
public class XmlImportExportStressTest extends ServiceTestBase
{
public static final int CONSUMER_TIMEOUT = 5000;
@ -85,8 +84,8 @@ public class XmlImportExportStressTest extends ServiceTestBase
System.out.println("Writing XML...");
FileOutputStream xmlOutputStream = new FileOutputStream(FILE_NAME);
BufferedOutputStream bufferOut = new BufferedOutputStream(xmlOutputStream);
XmlDataExporter xmlDataExporter = new XmlDataExporter(bufferOut, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
xmlDataExporter.writeXMLData();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(bufferOut, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
bufferOut.close();
System.out.println("Done writing XML.");
@ -102,8 +101,8 @@ public class XmlImportExportStressTest extends ServiceTestBase
System.out.println("Reading XML...");
FileInputStream xmlInputStream = new FileInputStream(FILE_NAME);
XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session, managementSession);
xmlDataImporter.processXml();
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session, managementSession);
xmlInputStream.close();
System.out.println("Done reading XML.");

View File

@ -61,7 +61,7 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -62,12 +62,12 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-bootstrap</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-tools</artifactId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -15,12 +15,6 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import org.apache.activemq.artemis.tests.util.UnitTestCase;
import org.apache.activemq.artemis.tools.ExportJournal;
import org.apache.activemq.artemis.tools.ImportJournal;
import org.junit.Before;
import org.junit.After;
import java.io.File;
import java.io.FilenameFilter;
import java.util.ArrayList;
@ -31,15 +25,19 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import org.junit.Assert;
import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal;
import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.tests.util.UnitTestCase;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
public abstract class JournalImplTestBase extends UnitTestCase
{
@ -226,7 +224,7 @@ public abstract class JournalImplTestBase extends UnitTestCase
{
System.out.println("Exporting to " + getTestDir() + "/output.log");
ExportJournal.exportJournal(getTestDir(),
EncodeJournal.exportJournal(getTestDir(),
this.filePrefix,
this.fileExtension,
this.minFiles,
@ -253,7 +251,7 @@ public abstract class JournalImplTestBase extends UnitTestCase
file.delete();
}
ImportJournal.importJournal(getTestDir(),
DecodeJournal.importJournal(getTestDir(),
filePrefix,
fileExtension,
minFiles,