ARTEMIS-1613 Integrating JDBC into CLI (create print-data and exp)

This commit is contained in:
Clebert Suconic 2018-01-12 14:07:23 -05:00 committed by Justin Bertram
parent 64424f2873
commit 61a1123ee1
24 changed files with 789 additions and 184 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.cli;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.List;
import io.airlift.airline.Cli;
@ -176,7 +177,11 @@ public class Artemis {
}
public static void printBanner() throws Exception {
copy(Artemis.class.getResourceAsStream("banner.txt"), System.out);
printBanner(System.out);
}
public static void printBanner(PrintStream out) throws Exception {
copy(Artemis.class.getResourceAsStream("banner.txt"), out);
}
private static long copy(InputStream in, OutputStream out) throws Exception {

View File

@ -26,6 +26,7 @@ import io.airlift.airline.Option;
import io.airlift.airline.model.CommandGroupMetadata;
import io.airlift.airline.model.CommandMetadata;
import io.airlift.airline.model.GlobalMetadata;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.cli.factory.BrokerFactory;
import org.apache.activemq.artemis.cli.factory.jmx.ManagementFactory;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
@ -58,6 +59,10 @@ public abstract class Configurable extends ActionAbstract {
System.err.println();
System.err.println("Error:" + e.getMessage());
System.err.println();
if (!(e instanceof ActiveMQException)) {
e.printStackTrace();
}
helpGroup(group, command);
}
@ -76,6 +81,14 @@ public abstract class Configurable extends ActionAbstract {
protected FileConfiguration getFileConfiguration() throws Exception {
if (fileConfiguration == null) {
fileConfiguration = readConfiguration();
}
return fileConfiguration;
}
protected FileConfiguration readConfiguration() throws Exception {
FileConfiguration fileConfiguration = new FileConfiguration();
if (getBrokerInstance() == null) {
final String defaultLocation = "./data";
fileConfiguration = new FileConfiguration();
@ -86,7 +99,6 @@ public abstract class Configurable extends ActionAbstract {
fileConfiguration.setPagingDirectory(defaultLocation + "/paging");
fileConfiguration.setBrokerInstance(new File("."));
} else {
fileConfiguration = new FileConfiguration();
FileJMSConfiguration jmsConfiguration = new FileJMSConfiguration();
String serverConfiguration = getBrokerDTO().server.getConfigurationURI().toASCIIString();
@ -95,7 +107,6 @@ public abstract class Configurable extends ActionAbstract {
fileDeploymentManager.readConfiguration();
fileConfiguration.setBrokerInstance(new File(getBrokerInstance()));
}
}
return fileConfiguration;
}

View File

@ -103,6 +103,7 @@ public class Create extends InputAbstract {
public static final String ETC_STOMP_ACCEPTOR_TXT = "etc/stomp-acceptor.txt";
public static final String ETC_PING_TXT = "etc/ping-settings.txt";
public static final String ETC_COMMENTED_PING_TXT = "etc/commented-ping-settings.txt";
public static final String ETC_DATABASE_STORE_TXT = "etc/database-store.txt";
public static final String ETC_GLOBAL_MAX_SPECIFIED_TXT = "etc/global-max-specified.txt";
public static final String ETC_GLOBAL_MAX_DEFAULT_TXT = "etc/global-max-default.txt";
@ -261,6 +262,40 @@ public class Create extends InputAbstract {
@Option(name = "--global-max-size", description = "Maximum amount of memory which message data may consume (Default: Undefined, half of the system's memory)")
private String globalMaxSize;
@Option(name = "--jdbc", description = "It will activate jdbc")
boolean jdbc;
@Option(name = "--jdbc-bindings-table-name", description = "Name of the jdbc bindigns table")
private String jdbcBindings = ActiveMQDefaultConfiguration.getDefaultBindingsTableName();
@Option(name = "--jdbc-message-table-name", description = "Name of the jdbc messages table")
private String jdbcMessages = ActiveMQDefaultConfiguration.getDefaultMessageTableName();
@Option(name = "--jdbc-large-message-table-name", description = "Name of the large messages table")
private String jdbcLargeMessages = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName();
@Option(name = "--jdbc-page-store-table-name", description = "Name of the page sotre messages table")
private String jdbcPageStore = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName();
@Option(name = "--jdbc-connection-url", description = "The connection used for the database")
private String jdbcURL = null;
@Option(name = "--jdbc-driver-class-name", description = "JDBC driver classname")
private String jdbcClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@Option(name = "--jdbc-network-timeout", description = "Network timeout")
long jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout();
@Option(name = "--jdbc-lock-acquisition-timeout", description = "Lock acquisition timeout")
long jdbcLockAcquisitionTimeout = ActiveMQDefaultConfiguration.getDefaultJournalLockAcquisitionTimeout();
@Option(name = "--jdbc-lock-renew-period", description = "Lock Renew Period")
long jdbcLockRenewPeriod = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
@Option(name = "--jdbc-lock-expiration", description = "Lock expiration")
long jdbcLockExpiration = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
private boolean IS_WINDOWS;
private boolean IS_CYGWIN;
@ -557,6 +592,25 @@ public class Create extends InputAbstract {
filters.put("${global-max-section}", readTextFile(ETC_GLOBAL_MAX_SPECIFIED_TXT, filters));
}
if (jdbc) {
if (jdbcURL == null) {
jdbcURL = "jdbc:derby:" + getInstance().getAbsolutePath() + "/data/derby/db;create=true";
}
filters.put("${jdbcBindings}", jdbcBindings);
filters.put("${jdbcMessages}", jdbcMessages);
filters.put("${jdbcLargeMessages}", jdbcLargeMessages);
filters.put("${jdbcPageStore}", jdbcPageStore);
filters.put("${jdbcURL}", jdbcURL);
filters.put("${jdbcClassName}", jdbcClassName);
filters.put("${jdbcNetworkTimeout}", "" + jdbcNetworkTimeout);
filters.put("${jdbcLockAcquisitionTimeout}", "" + jdbcLockAcquisitionTimeout);
filters.put("${jdbcLockRenewPeriod}", "" + jdbcLockRenewPeriod);
filters.put("${jdbcLockExpiration}", "" + jdbcLockExpiration);
filters.put("${jdbc}", readTextFile(ETC_DATABASE_STORE_TXT, filters));
} else {
filters.put("${jdbc}", "");
}
if (clustered) {
filters.put("${host}", getHostForClustered());
@ -682,6 +736,13 @@ public class Create extends InputAbstract {
filters.put("${auto-create}", isAutoCreate() ? "true" : "false");
if (jdbc) {
noAutoTune = true;
System.out.println();
printStar("Copy a jar containing the JDBC Driver '" + jdbcClassName + "' into " + directory.getAbsolutePath() + "/lib");
System.out.println();
}
performAutoTune(filters, journalType, dataFolder);
write(ETC_BROKER_XML, filters, false);
@ -725,6 +786,19 @@ public class Create extends InputAbstract {
return null;
}
private void printStar(String message) {
int size = Math.min(message.length(), 80);
StringBuffer buffer = new StringBuffer(size);
for (int i = 0; i < size; i++) {
buffer.append("*");
}
System.out.println(buffer.toString());
System.out.println();
System.out.println(message);
System.out.println();
System.out.println(buffer.toString());
}
private void setupJournalType() {
if (noJournalSync && !mapped) {
@ -935,7 +1009,13 @@ public class Create extends InputAbstract {
if (filters != null) {
for (Map.Entry<String, String> entry : filters.entrySet()) {
try {
content = replace(content, entry.getKey(), entry.getValue());
} catch (Throwable e) {
System.out.println("Error on " + entry.getKey());
e.printStackTrace();
System.exit(-1);
}
}
}
return content;

View File

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.tools;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.impl.journal.JDBCJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
public class DBOption extends OptionalLocking {
protected JournalStorageManager storageManager;
protected Configuration config;
protected ExecutorService executor;
protected ExecutorFactory executorFactory;
protected ScheduledExecutorService scheduledExecutorService;
@Option(name = "--output", description = "Output name for the file")
private String output;
@Option(name = "--jdbc", description = "It will activate jdbc")
Boolean jdbc;
@Option(name = "--jdbc-bindings-table-name", description = "Name of the jdbc bindigns table")
private String jdbcBindings = ActiveMQDefaultConfiguration.getDefaultBindingsTableName();
@Option(name = "--jdbc-message-table-name", description = "Name of the jdbc messages table")
private String jdbcMessages = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName();
@Option(name = "--jdbc-large-message-table-name", description = "Name of the large messages table")
private String jdbcLargeMessages = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName();
@Option(name = "--jdbc-page-store-table-name", description = "Name of the page sotre messages table")
private String jdbcPageStore = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName();
@Option(name = "--jdbc-connection-url", description = "The connection used for the database")
private String jdbcURL = null;
@Option(name = "--jdbc-driver-class-name", description = "JDBC driver classname")
private String jdbcClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
public boolean isJDBC() throws Exception {
parseDBConfig();
return jdbc;
}
public String getJdbcBindings() throws Exception {
parseDBConfig();
return jdbcBindings;
}
public DBOption setJdbcBindings(String jdbcBindings) {
this.jdbcBindings = jdbcBindings;
return this;
}
public String getJdbcMessages() throws Exception {
parseDBConfig();
return jdbcMessages;
}
public DBOption setJdbcMessages(String jdbcMessages) {
this.jdbcMessages = jdbcMessages;
return this;
}
public String getJdbcLargeMessages() throws Exception {
parseDBConfig();
return jdbcLargeMessages;
}
public DBOption setJdbcLargeMessages(String jdbcLargeMessages) {
this.jdbcLargeMessages = jdbcLargeMessages;
return this;
}
public String getJdbcPageStore() throws Exception {
parseDBConfig();
return jdbcPageStore;
}
public DBOption setJdbcPageStore(String jdbcPageStore) {
this.jdbcPageStore = jdbcPageStore;
return this;
}
public String getJdbcURL() throws Exception {
parseDBConfig();
return jdbcURL;
}
public DBOption setJdbcURL(String jdbcURL) {
this.jdbcURL = jdbcURL;
return this;
}
public String getJdbcClassName() throws Exception {
parseDBConfig();
return jdbcClassName;
}
public DBOption setJdbcClassName(String jdbcClassName) {
this.jdbcClassName = jdbcClassName;
return this;
}
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
if (output != null) {
FileOutputStream fileOutputStream = new FileOutputStream(output);
PrintStream printStream = new PrintStream(fileOutputStream);
context.out = printStream;
Runtime.getRuntime().addShutdownHook(new Thread(printStream::close));
}
return null;
}
private void parseDBConfig() throws Exception {
if (jdbc == null) {
FileConfiguration fileConfiguration = getFileConfiguration();
jdbc = fileConfiguration.isJDBC();
if (jdbc) {
DatabaseStorageConfiguration storageConfiguration = (DatabaseStorageConfiguration) fileConfiguration.getStoreConfiguration();
jdbcBindings = storageConfiguration.getBindingsTableName();
jdbcMessages = storageConfiguration.getMessageTableName();
jdbcLargeMessages = storageConfiguration.getLargeMessageTableName();
jdbcPageStore = storageConfiguration.getPageStoreTableName();
jdbcURL = storageConfiguration.getJdbcConnectionUrl();
jdbcClassName = storageConfiguration.getJdbcDriverClassName();
}
}
}
// Get a new configuration based on the passed parameters and not on the parsed configuration
public Configuration getParameterConfiguration() throws Exception {
Configuration configuration = readConfiguration();
if (isJDBC()) {
DatabaseStorageConfiguration storageConfiguration = new DatabaseStorageConfiguration();
storageConfiguration.setJdbcConnectionUrl(getJdbcURL());
storageConfiguration.setJdbcDriverClassName(getJdbcClassName());
storageConfiguration.setBindingsTableName(getJdbcBindings());
storageConfiguration.setMessageTableName(getJdbcMessages());
storageConfiguration.setLargeMessageTableName(getJdbcLargeMessages());
storageConfiguration.setPageStoreTableName(getJdbcPageStore());
configuration.setStoreConfiguration(storageConfiguration);
} else {
configuration.setBindingsDirectory(getBinding());
configuration.setJournalDirectory(getJournal());
configuration.setPagingDirectory(getPaging());
configuration.setLargeMessagesDirectory(getLargeMessages());
}
return configuration;
}
protected PagingManager pagingmanager;
protected void initializeJournal(Configuration configuration) throws Exception {
this.config = configuration;
executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
executorFactory = new OrderedExecutorFactory(executor);
scheduledExecutorService = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
});
HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<>(config.getWildcardConfiguration());
addressSettingsRepository.setDefault(new AddressSettings());
if (configuration.isJDBC()) {
storageManager = new JDBCJournalStorageManager(config, null, scheduledExecutorService, executorFactory, executorFactory, null);
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryDatabase((DatabaseStorageConfiguration) configuration.getStoreConfiguration(),
storageManager, 1000L,
scheduledExecutorService, executorFactory,
false, null);
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
} else {
storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory);
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, true, null);
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
}
}
protected void cleanup() throws Exception {
executor.shutdown();
scheduledExecutorService.shutdown();
storageManager.stop();
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.cli.commands.tools;
import java.io.File;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -32,6 +33,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.paging.PagedMessage;
@ -60,8 +62,10 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
public class PrintData extends OptionalLocking {
public class PrintData extends DBOption {
private static final String BINDINGS_BANNER = "B I N D I N G S J O U R N A L";
private static final String MESSAGES_BANNER = "M E S S A G E S J O U R N A L";
static {
MessagePersister.registerPersister(CoreMessagePersister.getInstance());
}
@ -69,18 +73,49 @@ public class PrintData extends OptionalLocking {
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
Configuration configuration = getParameterConfiguration();
try {
printData(new File(getBinding()), new File(getJournal()), new File(getPaging()));
if (configuration.isJDBC()) {
printDataJDBC(configuration, context.out);
} else {
printData(new File(getBinding()), new File(getJournal()), new File(getPaging()), context.out);
}
} catch (Exception e) {
treatError(e, "data", "print");
}
return null;
}
public void printDataJDBC(Configuration configuration, PrintStream out) throws Exception {
initializeJournal(configuration);
Artemis.printBanner(out);
printBanner(out, BINDINGS_BANNER);
DescribeJournal.printSurvivingRecords(storageManager.getBindingsJournal(), out);
printBanner(out, MESSAGES_BANNER);
DescribeJournal describeJournal = DescribeJournal.printSurvivingRecords(storageManager.getMessageJournal(), out);
printPages(describeJournal, storageManager, pagingmanager, out);
cleanup();
}
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory) throws Exception {
printData(bindingsDirectory, messagesDirectory, pagingDirectory, System.out);
}
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory, PrintStream out) throws Exception {
// Having the version on the data report is an information very useful to understand what happened
// When debugging stuff
Artemis.printBanner();
Artemis.printBanner(out);
File serverLockFile = new File(messagesDirectory, "server.lock");
@ -88,45 +123,35 @@ public class PrintData extends OptionalLocking {
try {
FileLockNodeManager fileLock = new FileLockNodeManager(messagesDirectory, false);
fileLock.start();
System.out.println("********************************************");
System.out.println("Server's ID=" + fileLock.getNodeId().toString());
System.out.println("********************************************");
printBanner(out, "Server's ID=" + fileLock.getNodeId().toString());
fileLock.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("********************************************");
System.out.println("B I N D I N G S J O U R N A L");
System.out.println("********************************************");
printBanner(out, BINDINGS_BANNER);
try {
DescribeJournal.describeBindingsJournal(bindingsDirectory);
DescribeJournal.describeBindingsJournal(bindingsDirectory, out);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println();
System.out.println("********************************************");
System.out.println("M E S S A G E S J O U R N A L");
System.out.println("********************************************");
printBanner(out, MESSAGES_BANNER);
DescribeJournal describeJournal = null;
try {
describeJournal = DescribeJournal.describeMessagesJournal(messagesDirectory);
describeJournal = DescribeJournal.describeMessagesJournal(messagesDirectory, out);
} catch (Exception e) {
e.printStackTrace();
return;
}
try {
System.out.println();
System.out.println("********************************************");
System.out.println("P A G I N G");
System.out.println("********************************************");
printBanner(out, "P A G I N G");
printPages(pagingDirectory, describeJournal);
printPages(pagingDirectory, describeJournal, out);
} catch (Exception e) {
e.printStackTrace();
return;
@ -134,13 +159,16 @@ public class PrintData extends OptionalLocking {
}
private static void printPages(File pageDirectory, DescribeJournal describeJournal) {
protected static void printBanner(PrintStream out, String x2) {
out.println();
out.println("********************************************");
out.println(x2);
out.println("********************************************");
}
private static void printPages(File pageDirectory, DescribeJournal describeJournal, PrintStream out) {
try {
PageCursorsInfo cursorACKs = calculateCursorsInfo(describeJournal.getRecords());
Set<Long> pgTXs = cursorACKs.getPgTXs();
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
ExecutorFactory execfactory = new ExecutorFactory() {
@ -155,6 +183,20 @@ public class PrintData extends OptionalLocking {
addressSettingsRepository.setDefault(new AddressSettings());
PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
printPages(describeJournal, sm, manager, out);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void printPages(DescribeJournal describeJournal,
StorageManager sm,
PagingManager manager,
PrintStream out) throws Exception {
PageCursorsInfo cursorACKs = calculateCursorsInfo(describeJournal.getRecords());
Set<Long> pgTXs = cursorACKs.getPgTXs();
manager.start();
SimpleString[] stores = manager.getStoreNames();
@ -166,11 +208,11 @@ public class PrintData extends OptionalLocking {
if (pgStore != null) {
folder = pgStore.getFolder();
}
System.out.println("####################################################################################################");
System.out.println("Exploring store " + store + " folder = " + folder);
out.println("####################################################################################################");
out.println("Exploring store " + store + " folder = " + folder);
int pgid = (int) pgStore.getFirstPage();
for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++) {
System.out.println("******* Page " + pgid);
out.println("******* Page " + pgid);
Page page = pgStore.createPage(pgid);
page.open();
List<PagedMessage> msgs = page.read(sm);
@ -180,11 +222,11 @@ public class PrintData extends OptionalLocking {
for (PagedMessage msg : msgs) {
msg.initMessage(sm);
System.out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ",userMessageID=" + (msg.getMessage().getUserID() != null ? msg.getMessage().getUserID() : "") + ", msg=" + msg.getMessage());
System.out.print(",Queues = ");
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ",userMessageID=" + (msg.getMessage().getUserID() != null ? msg.getMessage().getUserID() : "") + ", msg=" + msg.getMessage());
out.print(",Queues = ");
long[] q = msg.getQueueIDs();
for (int i = 0; i < q.length; i++) {
System.out.print(q[i]);
out.print(q[i]);
PagePosition posCheck = new PagePositionImpl(pgid, msgID);
@ -196,29 +238,26 @@ public class PrintData extends OptionalLocking {
}
if (acked) {
System.out.print(" (ACK)");
out.print(" (ACK)");
}
if (cursorACKs.getCompletePages(q[i]).contains(Long.valueOf(pgid))) {
System.out.println(" (PG-COMPLETE)");
out.println(" (PG-COMPLETE)");
}
if (i + 1 < q.length) {
System.out.print(",");
out.print(",");
}
}
if (msg.getTransactionID() >= 0 && !pgTXs.contains(msg.getTransactionID())) {
System.out.print(", **PG_TX_NOT_FOUND**");
out.print(", **PG_TX_NOT_FOUND**");
}
System.out.println();
out.println();
msgID++;
}
pgid++;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**

View File

@ -32,9 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import io.airlift.airline.Command;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -45,31 +42,24 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.tools.OptionalLocking;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.cli.commands.tools.DBOption;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.MessageDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding;
@ -77,24 +67,11 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
public final class XmlDataExporter extends OptionalLocking {
public final class XmlDataExporter extends DBOption {
private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
private JournalStorageManager storageManager;
private Configuration config;
private XMLStreamWriter xmlWriter;
// an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
@ -120,32 +97,52 @@ public final class XmlDataExporter extends OptionalLocking {
super.execute(context);
try {
process(context.out, getBinding(), getJournal(), getPaging(), getLargeMessages());
config = getParameterConfiguration();
process(context.out);
} catch (Exception e) {
treatError(e, "data", "exp");
}
return null;
}
/**
* Use setConfiguration and process(out) instead.
*
* @param out
* @param bindingsDir
* @param journalDir
* @param pagingDir
* @param largeMessagesDir
* @throws Exception
*/
@Deprecated
public void process(OutputStream out,
String bindingsDir,
String journalDir,
String pagingDir,
String largeMessagesDir) throws Exception {
config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO);
final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
initializeJournal(config);
writeOutput(out);
cleanup();
}
storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory);
public void process(OutputStream out) throws Exception {
initializeJournal(config);
writeOutput(out);
cleanup();
}
protected void writeOutput(OutputStream out) throws Exception {
XMLOutputFactory factory = XMLOutputFactory.newInstance();
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
writeXMLData();
executor.shutdown();
}
private void writeXMLData() throws Exception {
@ -209,7 +206,7 @@ public final class XmlDataExporter extends OptionalLocking {
}
};
((JournalImpl) messageJournal).load(records, preparedTransactions, transactionFailureCallback, false);
messageJournal.load(records, preparedTransactions, transactionFailureCallback, false);
// Since we don't use these nullify the reference so that the garbage collector can clean them up
preparedTransactions = null;
@ -303,7 +300,7 @@ public final class XmlDataExporter extends OptionalLocking {
ActiveMQServerLogger.LOGGER.debug("Reading bindings journal from " + config.getBindingsDirectory());
((JournalImpl) bindingsJournal).load(records, null, null, false);
bindingsJournal.load(records, null, null);
for (RecordInfo info : records) {
if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) {
@ -384,25 +381,13 @@ public final class XmlDataExporter extends OptionalLocking {
*/
private void printPagedMessagesAsXML() {
try {
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
ExecutorFactory executorFactory = new ExecutorFactory() {
@Override
public ArtemisExecutor getExecutor() {
return ArtemisExecutor.delegate(executor);
}
};
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduled, executorFactory, true, null);
HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<>(config.getWildcardConfiguration());
addressSettingsRepository.setDefault(new AddressSettings());
PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
manager.start();
pagingmanager.start();
SimpleString[] stores = manager.getStoreNames();
SimpleString[] stores = pagingmanager.getStoreNames();
for (SimpleString store : stores) {
PagingStore pageStore = manager.getPageStore(store);
PagingStore pageStore = pagingmanager.getPageStore(store);
if (pageStore != null) {
File folder = pageStore.getFolder();

View File

@ -27,6 +27,7 @@ under the License.
<name>${name}</name>
${jdbc}
<persistence-enabled>${persistence-enabled}</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO

View File

@ -0,0 +1,18 @@
<store>
<database-store>
<!-- The most efficient persistent layer for Artemis is the file-store,
however if you require a database please refer to your database provider
for any database specific questions.
We don't endorse any specific JDBC provider. Derby is provided by default for demonstration purposes. -->
<jdbc-driver-class-name>${jdbcClassName}</jdbc-driver-class-name>
<jdbc-connection-url>${jdbcURL}</jdbc-connection-url>
<message-table-name>${jdbcMessages}</message-table-name>
<bindings-table-name>${jdbcBindings}</bindings-table-name>
<large-message-table-name>${jdbcLargeMessages}</large-message-table-name>
<page-store-table-name>${jdbcPageStore}</page-store-table-name>
<jdbc-lock-acquisition-timeout>${jdbcLockAcquisitionTimeout}</jdbc-lock-acquisition-timeout>
<jdbc-lock-expiration>${jdbcLockExpiration}</jdbc-lock-expiration>
<jdbc-lock-renew-period>${jdbcLockRenewPeriod}</jdbc-lock-renew-period>
<jdbc-network-timeout>${jdbcNetworkTimeout}</jdbc-network-timeout>
</database-store>
</store>

View File

@ -140,6 +140,13 @@ public class ArtemisTest extends CliTestBase {
}
@Test
public void testCreateDB() throws Exception {
File instance1 = new File(temporaryFolder.getRoot(), "instance1");
Artemis.internalExecute("create", instance1.getAbsolutePath(), "--silent", "--jdbc");
}
@Test
public void testSimpleCreateMapped() throws Throwable {
try {

View File

@ -60,6 +60,7 @@ public class StreamClassPathTest {
openStream(Create.ETC_GLOBAL_MAX_SPECIFIED_TXT);
openStream(Create.ETC_GLOBAL_MAX_DEFAULT_TXT);
openStream(Create.ETC_JOLOKIA_ACCESS_XML);
openStream(Create.ETC_DATABASE_STORE_TXT);
}

View File

@ -423,8 +423,8 @@ public final class ActiveMQDefaultConfiguration {
// Default database url. Derby database is used by default.
private static String DEFAULT_DATABASE_URL = null;
// Default JDBC Driver class name
private static String DEFAULT_JDBC_DRIVER_CLASS_NAME = null;
// Default JDBC Driver class name, derby by default just for demo purposes
private static String DEFAULT_JDBC_DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
// Default message table name, used with Database storage type
private static String DEFAULT_MESSAGE_TABLE_NAME = "MESSAGES";

View File

@ -881,12 +881,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
public JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
return load(committedRecords, preparedTransactions, transactionFailure, true);
}
public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,

View File

@ -194,9 +194,16 @@ public interface Journal extends ActiveMQComponent {
void lineUpContext(IOCompletion callback);
default JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
return load(committedRecords, preparedTransactions, transactionFailure, true);
}
JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception;
TransactionFailureCallback transactionFailure,
boolean fixBadTx) throws Exception;
int getAlignment() throws Exception;

View File

@ -269,7 +269,8 @@ public final class FileWrapperJournal extends JournalBase {
@Override
public JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
TransactionFailureCallback transactionFailure,
boolean fixbadtx) throws Exception {
throw new ActiveMQUnsupportedPacketException();
}

View File

@ -1367,16 +1367,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return load(DummyLoader.INSTANCE, true, syncState);
}
@Override
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback) throws Exception {
return load(committedRecords, preparedTransactions, failureCallback, true);
}
/**
* @see JournalImpl#load(LoaderCallback)
*/
@Override
public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,

View File

@ -1044,6 +1044,11 @@ public interface Configuration {
*/
File getBrokerInstance();
default boolean isJDBC() {
StoreConfiguration configuration = getStoreConfiguration();
return (configuration != null && configuration.getStoreType() == StoreConfiguration.StoreType.DATABASE);
}
StoreConfiguration getStoreConfiguration();
Configuration setStoreConfiguration(StoreConfiguration storeConfiguration);

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
@ -105,15 +106,22 @@ public final class DescribeJournal {
}
public static void describeBindingsJournal(final File bindingsDir) throws Exception {
describeBindingsJournal(bindingsDir, System.out);
}
public static void describeBindingsJournal(final File bindingsDir, PrintStream out) throws Exception {
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null, 1);
JournalImpl bindings = new JournalImpl(1024 * 1024, 2, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1);
describeJournal(bindingsFF, bindings, bindingsDir);
describeJournal(bindingsFF, bindings, bindingsDir, out);
}
public static DescribeJournal describeMessagesJournal(final File messagesDir) throws Exception {
return describeMessagesJournal(messagesDir, System.out);
}
public static DescribeJournal describeMessagesJournal(final File messagesDir, PrintStream out) throws Exception {
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null, 1);
// Will use only default values. The load function should adapt to anything different
@ -121,7 +129,7 @@ public final class DescribeJournal {
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(), defaultValues.getJournalMinFiles(), defaultValues.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
return describeJournal(messagesFF, messagesJournal, messagesDir);
return describeJournal(messagesFF, messagesJournal, messagesDir, out);
}
/**
@ -131,11 +139,10 @@ public final class DescribeJournal {
*/
private static DescribeJournal describeJournal(SequentialFileFactory fileFactory,
JournalImpl journal,
final File path) throws Exception {
final File path,
PrintStream out) throws Exception {
List<JournalFile> files = journal.orderFiles();
final PrintStream out = System.out;
final Map<Long, PageSubscriptionCounterImpl> counters = new HashMap<>();
out.println("Journal path: " + path);
@ -246,6 +253,13 @@ public final class DescribeJournal {
printCounters(out, counters);
}
return printSurvivingRecords(journal, out);
}
public static DescribeJournal printSurvivingRecords(Journal journal,
PrintStream out) throws Exception {
final Map<Long, PageSubscriptionCounterImpl> counters = new HashMap<>();
out.println("### Surviving Records Summary ###");
List<RecordInfo> records = new LinkedList<>();
@ -277,8 +291,6 @@ public final class DescribeJournal {
}
}, false);
counters.clear();
for (RecordInfo info : records) {
PageSubscriptionCounterImpl subsCounter = null;
long queueIDForCounter = 0;

View File

@ -426,8 +426,9 @@ public class ReplicatedJournal implements Journal {
@Override
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure) throws Exception {
return localJournal.load(committedRecords, preparedTransactions, transactionFailure);
final TransactionFailureCallback transactionFailure,
final boolean fixbadTX) throws Exception {
return localJournal.load(committedRecords, preparedTransactions, transactionFailure, fixbadTX);
}
/**

View File

@ -111,9 +111,9 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
}
final long measuredRenewPeriodNanos = renewStart - lastRenewStart;
final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos);
if (measuredRenewPeriodMillis > expirationMillis) {
if (measuredRenewPeriodMillis - expirationMillis > 100) {
LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
} else if (measuredRenewPeriodMillis > expectedRenewPeriodMillis) {
} else if (measuredRenewPeriodMillis - expectedRenewPeriodMillis > 100) {
LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
}
}

View File

@ -0,0 +1,131 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>jms-examples</artifactId>
<version>2.5.0-SNAPSHOT</version>
</parent>
<artifactId>datatabase</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JMS Expiry Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client-all</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<libList>
<!-- adding derby for the example -->
<arg>org.apache.derby:derby:${apache.derby.version}</arg>
</libList>
<args>
<arg>--jdbc</arg>
</args>
</configuration>
</execution>
<execution>
<id>start</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.DBExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>datatabase</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<groupId>com.vladsch.flexmark</groupId>
<artifactId>markdown-page-generator-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,7 @@
# Database example
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to start and create the broker manually.
This example shows you how to configure ActiveMQ Artemis to run with a database.
Notice this is not making any assumption of what is the recommended database to be used with Artemis. After all we recommend the artemis journal to be used, however in certain environments users will prefer databases for specific reasons.

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
/**
* An example showing how messages are moved to an expiry queue when they expire.
*/
public class DBExample {
public static void main(final String[] args) throws Exception {
InitialContext initialContext = null;
ConnectionFactory cf = new ActiveMQConnectionFactory();
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue1");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("this is a text message");
producer.send(message);
System.out.println("Sent message to " + queue.getQueueName() + ": " + message.getText());
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
System.out.println("Received message from " + queue.getQueueName() + ": " + messageReceived);
}
}
}

View File

@ -47,6 +47,7 @@ under the License.
<module>cdi</module>
<module>client-kickoff</module>
<module>consumer-rate-limit</module>
<module>database</module>
<module>dead-letter</module>
<module>delayed-redelivery</module>
<module>divert</module>
@ -114,6 +115,7 @@ under the License.
<module>cdi</module>
<module>client-kickoff</module>
<module>consumer-rate-limit</module>
<module>database</module>
<module>dead-letter</module>
<module>delayed-redelivery</module>
<module>divert</module>

View File

@ -813,7 +813,8 @@ public final class ReplicationTest extends ActiveMQTestBase {
@Override
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure) throws Exception {
final TransactionFailureCallback transactionFailure,
final boolean fixbadtx) throws Exception {
return new JournalLoadInformation();
}