ARTEMIS-27 / ARTEMIS-340 Add JDBC Storage Manager

This commit is contained in:
Martyn Taylor 2016-01-07 10:12:07 +00:00 committed by Clebert Suconic
parent 9dd9c021a0
commit 64f74acdbc
38 changed files with 3109 additions and 2255 deletions

1
.gitignore vendored
View File

@ -9,6 +9,7 @@ ratReport.txt
.settings
.checkstyle
.factorypath
**/derby.log
# for native build
CMakeCache.txt

View File

@ -396,6 +396,18 @@ public final class ActiveMQDefaultConfiguration {
// How often the reaper will be run to check for timed out group bindings. Only valid for LOCAL handlers
private static long DEFAULT_GROUPING_HANDLER_REAPER_PERIOD = 30000;
// Which store type to use, options are FILE or DATABASE, FILE is default.
private static String DEFAULT_STORE_TYPE = "FILE";
// Default database url. Derby database is used by default.
private static String DEFAULT_DATABASE_URL = "jdbc:derby:data/derby;create=true";
// Default message table name, used with Database storage type
private static String DEFAULT_MESSAGE_TABLE_NAME = "MESSAGES";
// Default bindings table name, used with Database storage type
private static String DEFAULT_BINDINGS_TABLE_NAME = "BINDINGS";
/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
@ -1052,4 +1064,28 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_GROUPING_HANDLER_REAPER_PERIOD;
}
/**
* The default storage type. Options are FILE and DATABASE.
*/
public static String getDefaultStoreType() {
return DEFAULT_STORE_TYPE;
}
/**
* The default database URL, used with DATABASE store type.
*/
public static String getDefaultDatabaseUrl() {
return DEFAULT_DATABASE_URL;
}
/**
* The default Message Journal table name, used with DATABASE store.
*/
public static String getDefaultMessageTableName() {
return DEFAULT_MESSAGE_TABLE_NAME;
}
public static String getDefaultBindingsTableName() {
return DEFAULT_BINDINGS_TABLE_NAME;
}
}

View File

@ -33,6 +33,7 @@ import java.util.Timer;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
@ -40,7 +41,6 @@ import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.derby.jdbc.AutoloadedDriver;

View File

@ -66,6 +66,11 @@
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jdbc-store</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>

View File

@ -271,6 +271,7 @@ public interface Configuration {
/**
* Add an acceptor to the config
*
* @param name the name of the acceptor
* @param uri the URI of the acceptor
* @return this
@ -935,4 +936,7 @@ public interface Configuration {
*/
File getBrokerInstance();
StoreConfiguration getStoreConfiguration();
Configuration setStoreConfiguration(StoreConfiguration storeConfiguration);
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config;
import java.io.Serializable;
public interface StoreConfiguration extends Serializable {
public enum StoreType {
FILE,
DATABASE
}
StoreType getStoreType();
}

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.security.Role;
@ -228,6 +229,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private HAPolicyConfiguration haPolicyConfiguration;
private StoreConfiguration storeConfiguration;
/**
* Parent folder for all data folders.
*/
@ -407,7 +410,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public ConfigurationImpl addConnectorConfiguration(final String name, final String uri) throws Exception {
@ -422,7 +424,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public ConfigurationImpl clearConnectorConfigurations() {
connectorConfigs.clear();
@ -1278,6 +1279,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return resolveProtocols;
}
@Override
public StoreConfiguration getStoreConfiguration() {
return storeConfiguration;
}
@Override
public ConfigurationImpl setStoreConfiguration(StoreConfiguration storeConfiguration) {
this.storeConfiguration = storeConfiguration;
return this;
}
@Override
public int hashCode() {
final int prime = 31;

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.storage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
public class DatabaseStorageConfiguration implements StoreConfiguration {
private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName();
private String bindingsTableName = ActiveMQDefaultConfiguration.getDefaultBindingsTableName();
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
}
public String getMessageTableName() {
return messageTableName;
}
public void setMessageTableName(String messageTableName) {
this.messageTableName = messageTableName;
}
public String getBindingsTableName() {
return bindingsTableName;
}
public void setBindingsTableName(String bindingsTableName) {
this.bindingsTableName = bindingsTableName;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}
public String getJdbcConnectionUrl() {
return jdbcConnectionUrl;
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.storage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
public class FileStorageConfiguration implements StoreConfiguration {
private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName();
private String bindingsTableName = ActiveMQDefaultConfiguration.getDefaultBindingsTableName();
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
}
public String getMessageTableName() {
return messageTableName;
}
public void setMessageTableName(String messageTableName) {
this.messageTableName = messageTableName;
}
public String getBindingsTableName() {
return bindingsTableName;
}
public void setBindingsTableName(String bindingsTableName) {
this.bindingsTableName = bindingsTableName;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}
public String getJdbcConnectionUrl() {
return jdbcConnectionUrl;
}
}

View File

@ -54,6 +54,8 @@ import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfigu
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.Validators;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.config.storage.FileStorageConfiguration;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -214,6 +216,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration());
}
NodeList storeTypeNodes = e.getElementsByTagName("store");
if (storeTypeNodes.getLength() > 0) {
parseStoreConfiguration((Element) storeTypeNodes.item(0), config);
}
config.setResolveProtocols(getBoolean(e, "resolve-protocols", config.isResolveProtocols()));
config.setPersistenceEnabled(getBoolean(e, "persistence-enabled", config.isPersistenceEnabled()));
@ -905,6 +913,28 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
HA_LIST.add("replication");
}
private static final ArrayList<String> STORE_TYPE_LIST = new ArrayList<>();
static {
STORE_TYPE_LIST.add("database-store");
STORE_TYPE_LIST.add("file-store");
}
private void parseStoreConfiguration(final Element e, final Configuration mainConfig) {
for (String storeType : STORE_TYPE_LIST) {
NodeList storeNodeList = e.getElementsByTagName(storeType);
if (storeNodeList.getLength() > 0) {
Element storeNode = (Element) storeNodeList.item(0);
if (storeNode.getTagName().equals("database-store")) {
mainConfig.setStoreConfiguration(createDatabaseStoreConfig(storeNode));
}
else if (storeNode.getTagName().equals("file-store")) {
mainConfig.setStoreConfiguration(createFileStoreConfig(storeNode));
}
}
}
}
private void parseHAPolicyConfiguration(final Element e, final Configuration mainConfig) {
for (String haType : HA_LIST) {
NodeList haNodeList = e.getElementsByTagName(haType);
@ -1105,6 +1135,20 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
return null;
}
private DatabaseStorageConfiguration createDatabaseStoreConfig(Element storeNode) {
NodeList databaseStoreNode = storeNode.getElementsByTagName("database-store");
DatabaseStorageConfiguration conf = new DatabaseStorageConfiguration();
conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK));
conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
return conf;
}
private FileStorageConfiguration createFileStoreConfig(Element storeNode) {
return new FileStorageConfiguration();
}
private void parseBroadcastGroupConfiguration(final Element e, final Configuration mainConfig) {
String name = e.getAttribute("name");

View File

@ -26,11 +26,9 @@ public final class AddMessageRecord {
final ServerMessage message;
// mtaylor (Added to compile)
public long scheduledDeliveryTime;
private long scheduledDeliveryTime;
// mtaylor (Added to compile)
public int deliveryCount;
private int deliveryCount;
public ServerMessage getMessage() {
return message;
@ -44,4 +42,11 @@ public final class AddMessageRecord {
return deliveryCount;
}
public void setScheduledDeliveryTime(long scheduledDeliveryTime) {
this.scheduledDeliveryTime = scheduledDeliveryTime;
}
public void setDeliveryCount(int deliveryCount) {
this.deliveryCount = deliveryCount;
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.utils.ExecutorFactory;
public class JDBCJournalStorageManager extends JournalStorageManager {
public JDBCJournalStorageManager(Configuration config, ExecutorFactory executorFactory) {
super(config, executorFactory);
}
public JDBCJournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final IOCriticalErrorListener criticalErrorListener) {
super(config, executorFactory, criticalErrorListener);
}
@Override
protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName());
bindingsJournal = localBindings;
Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName());
messageJournal = localMessage;
}
@Override
public synchronized void stop(boolean ioCriticalError) throws Exception {
if (!started) {
return;
}
if (!ioCriticalError) {
performCachedLargeMessageDeletes();
// Must call close to make sure last id is persisted
if (journalLoaded && idGenerator != null)
idGenerator.persistCurrentID();
}
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
latch.await(30, TimeUnit.SECONDS);
beforeStop();
((JDBCJournalImpl) bindingsJournal).stop(false);
messageJournal.stop();
singleThreadExecutor.shutdown();
journalLoaded = false;
started = false;
}
}

View File

@ -27,7 +27,6 @@ public final class JournalRecordIds {
// grouping journal record type
// mtaylor Added to compile
public static final byte GROUP_RECORD = 20;
// BindingsImpl journal record type

View File

@ -22,7 +22,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
/**
@ -40,8 +40,8 @@ public class ReplicationStartSyncMessage extends PacketImpl {
private boolean allowsAutoFailBack;
public enum SyncDataType {
JournalBindings(JournalContent.BINDINGS.typeByte),
JournalMessages(JournalContent.MESSAGES.typeByte),
JournalBindings(AbstractJournalStorageManager.JournalContent.BINDINGS.typeByte),
JournalMessages(AbstractJournalStorageManager.JournalContent.MESSAGES.typeByte),
LargeMessages((byte) 2);
private byte code;
@ -50,8 +50,8 @@ public class ReplicationStartSyncMessage extends PacketImpl {
this.code = code;
}
public static JournalContent getJournalContentType(SyncDataType dataType) {
return JournalContent.getType(dataType.code);
public static AbstractJournalStorageManager.JournalContent getJournalContentType(SyncDataType dataType) {
return AbstractJournalStorageManager.JournalContent.getType(dataType.code);
}
public static SyncDataType getDataType(byte code) {
@ -86,7 +86,7 @@ public class ReplicationStartSyncMessage extends PacketImpl {
}
public ReplicationStartSyncMessage(JournalFile[] datafiles,
JournalContent contentType,
AbstractJournalStorageManager.JournalContent contentType,
String nodeID,
boolean allowsAutoFailBack) {
this();

View File

@ -23,7 +23,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
/**
@ -35,7 +35,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
/**
* The JournalType or {@code null} if sync'ing large-messages.
*/
private JournalContent journalType;
private AbstractJournalStorageManager.JournalContent journalType;
/**
* This value refers to {@link org.apache.activemq.artemis.core.journal.impl.JournalFile#getFileID()}, or the
* message id if we are sync'ing a large-message.
@ -74,7 +74,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
super(REPLICATION_SYNC_FILE);
}
public ReplicationSyncFileMessage(JournalContent content,
public ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content,
SimpleString storeName,
long id,
int size,
@ -135,7 +135,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
fileId = buffer.readLong();
switch (FileType.getFileType(buffer.readByte())) {
case JOURNAL: {
journalType = JournalContent.getType(buffer.readByte());
journalType = AbstractJournalStorageManager.JournalContent.getType(buffer.readByte());
fileType = FileType.JOURNAL;
break;
}
@ -160,7 +160,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
return fileId;
}
public JournalContent getJournalContent() {
public AbstractJournalStorageManager.JournalContent getJournalContent() {
return journalType;
}

View File

@ -47,7 +47,7 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;

View File

@ -32,12 +32,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
@ -437,7 +437,7 @@ public final class ReplicationManager implements ActiveMQComponent {
* @throws ActiveMQException
* @throws Exception
*/
public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception {
public void syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception {
if (!enabled) {
return;
}
@ -473,7 +473,7 @@ public final class ReplicationManager implements ActiveMQComponent {
* @param maxBytesToSend maximum number of bytes to read and send from the file
* @throws Exception
*/
private void sendLargeFile(JournalContent content,
private void sendLargeFile(AbstractJournalStorageManager.JournalContent content,
SimpleString pageStore,
final long id,
SequentialFile file,
@ -536,7 +536,7 @@ public final class ReplicationManager implements ActiveMQComponent {
* @throws ActiveMQException
*/
public void sendStartSyncMessage(JournalFile[] datafiles,
JournalContent contentType,
AbstractJournalStorageManager.JournalContent contentType,
String nodeID,
boolean allowsAutoFailBack) throws ActiveMQException {
if (enabled)

View File

@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@ -70,6 +71,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.persistence.impl.journal.JDBCJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
@ -1479,8 +1481,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
*/
private StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
return new JDBCJournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
}
// Default to File Based Storage Manager, (Legacy default configuration).
else {
return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
}
}
return new NullStorageManager();
}

View File

@ -683,6 +683,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="store" type="storeType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The Store Type used by the server
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="security-settings" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@ -1437,6 +1445,55 @@
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="storeType">
<xsd:choice>
<xsd:element name="file-store" type="fileStoreType" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
Use a file based store for peristing journal, paging and large messages
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="database-store" type="databaseStoreType" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
Use a database for persisting journal, paging and large messages
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:choice>
</xsd:complexType>
<xsd:complexType name="fileStoreType">
</xsd:complexType>
<xsd:complexType name="databaseStoreType">
<xsd:all>
<xsd:element name="jdbc-connection-url" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The JDBC Connection URL e.g. jdbc:mysql://localhost:3306/
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="message-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The table name used to store message journal entries
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="bindings-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The table name used to store bindings journal entries
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>
<xsd:complexType name="haPolicyType">
<xsd:choice>
<xsd:element name="live-only" type="haLiveOnlyPolicyType" minOccurs="0" maxOccurs="1">
@ -1489,6 +1546,7 @@
</xsd:choice>
</xsd:complexType>
<xsd:complexType name="haColocationReplicationType">
<xsd:all>
<xsd:element name="request-backup" type="xsd:boolean" minOccurs="0" maxOccurs="1" default="false">

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.impl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
public class DatabaseStoreConfigurationTest extends ActiveMQTestBase {
@Test
public void databaseStoreConfigTest() throws Exception {
Configuration configuration = createConfiguration("database-store-config.xml");
ActiveMQServerImpl server = new ActiveMQServerImpl(configuration);
assertEquals(StoreConfiguration.StoreType.DATABASE, server.getConfiguration().getStoreConfiguration().getStoreType());
}
protected Configuration createConfiguration(String fileName) throws Exception {
FileConfiguration fc = new FileConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager(fileName);
deploymentManager.addDeployable(fc);
deploymentManager.readConfiguration();
// we need this otherwise the data folder will be located under activemq-server and not on the temporary directory
fc.setPagingDirectory(getTestDir() + "/" + fc.getPagingDirectory());
fc.setLargeMessagesDirectory(getTestDir() + "/" + fc.getLargeMessagesDirectory());
fc.setJournalDirectory(getTestDir() + "/" + fc.getJournalDirectory());
fc.setBindingsDirectory(getTestDir() + "/" + fc.getBindingsDirectory());
return fc;
}
}

View File

@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@ -387,6 +388,19 @@ public abstract class ActiveMQTestBase extends Assert {
return createDefaultConfig(0, netty);
}
protected Configuration createDefaultJDBCConfig() throws Exception {
Configuration configuration = createDefaultConfig(true);
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS");
dbStorageConfiguration.setMessageTableName("MESSAGES");
configuration.setStoreConfiguration(dbStorageConfiguration);
return configuration;
}
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID)));
@ -749,6 +763,10 @@ public abstract class ActiveMQTestBase extends Assert {
return testDir;
}
protected final String getTestJDBCConnectionUrl() {
return "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true";
}
protected final File getTestDirfile() {
return new File(testDir);
}

View File

@ -0,0 +1,30 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration
xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq ../../main/resources/schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<store>
<database-store>
<jdbc-connection-url>jdbc:derby:target/derby/database-store;create=true</jdbc-connection-url>
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name>
</database-store>
</store>
</core>
</configuration>

View File

@ -3,12 +3,14 @@
In this chapter we will describe how persistence works with Apache ActiveMQ Artemis and
how to configure it.
Apache ActiveMQ Artemis ships with a high performance journal. Since Apache ActiveMQ Artemis handles
its own persistence, rather than relying on a database or other 3rd
party persistence engine it is very highly optimised for the specific
messaging use cases.
Apache ActiveMQ Artemis ships with two persistence options. The Apache ActiveMQ Artemis File journal
which is highly optimized for the messaging use case and gives great performance, and also Apache Artemis
JDBC Store, which uses JDBC to connect to a database of your choice. The JDBC Store is still under development,
but it is possible to use it's journal features, (essentially everything except for paging and large messages).
An Apache ActiveMQ Artemis journal is an *append only* journal. It consists of a set of
## Apache ActiveMQ Artemis File Journal (Default)
An Apache ActiveMQ Artemis file journal is an *append only* journal. It consists of a set of
files on disk. Each file is pre-created to a fixed size and initially
filled with padding. As operations are performed on the server, e.g. add
message, update message, delete message, records are appended to the
@ -126,7 +128,7 @@ If no persistence is required at all, Apache ActiveMQ Artemis can also be config
not to persist any data at all to storage as discussed in the Configuring
the broker for Zero Persistence section.
## Configuring the bindings journal
### Configuring the bindings journal
The bindings journal is configured using the following attributes in
`broker.xml`
@ -143,11 +145,11 @@ The bindings journal is configured using the following attributes in
`bindings-directory` if it does not already exist. The default value
is `true`
## Configuring the jms journal
### Configuring the jms journal
The jms config shares its configuration with the bindings journal.
## Configuring the message journal
### Configuring the message journal
The message journal is configured using the following attributes in
`broker.xml`
@ -297,7 +299,7 @@ The message journal is configured using the following attributes in
The default for this parameter is `30`
## An important note on disabling disk write cache.
### An important note on disabling disk write cache.
> **Warning**
>
@ -336,7 +338,7 @@ The message journal is configured using the following attributes in
> On Windows you can check / change the setting by right clicking on the
> disk and clicking properties.
## Installing AIO
### Installing AIO
The Java NIO journal gives great performance, but If you are running
Apache ActiveMQ Artemis using Linux Kernel 2.6 or later, we highly recommend you use
@ -356,6 +358,40 @@ Using aptitude, (e.g. on Ubuntu or Debian system):
apt-get install libaio
## Apache ActiveMQ Artemis JDBC Persistence
The Apache ActiveMQ Artemis JDBC persistence store is still under development and only supports persistence of standard messages and bindings (this is everything except large messages and paging). The JDBC store uses a JDBC connection to store messages and bindings data in records in database tables. The data stored in the database tables is encoded using Apache ActiveMQ Artemis journal encoding.
### Configuring JDBC Persistence
To configure Apache ActiveMQ Artemis to use a database for persisting messages and bindings data you must do two things.
1. Add the appropriate JDBC client libraries to the Artemis runtime. You can do this by dropping the relevant jars in the lib folder of the ActiveMQ Artemis distribution.
2. create a store element in your broker.xml config file under the <core> element. For example:
```xml
<store>
<database-store>
<jdbc-connection-url>jdbc:derby:target/derby/database-store;create=true</jdbc-connection-url>
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name>
</database-store>
</store>
```
- `jdbc-connection-url`
The full JDBC connection URL for your database server. The connection url should include all configuration parameters and database name.
- `bindings-table-name`
The name of the table in which bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
- `message-table-name`
The name of the table in which messages and related data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
## Configuring Apache ActiveMQ Artemis for Zero Persistence
In some situations, zero persistence is sometimes required for a
@ -366,3 +402,5 @@ straightforward. Simply set the parameter `persistence-enabled` in
Please note that if you set this parameter to false, then *zero*
persistence will occur. That means no bindings data, message data, large
message data, duplicate id caches or paging data will be persisted.

13
pom.xml
View File

@ -47,6 +47,7 @@
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
<module>artemis-jdbc-store</module>
<module>artemis-maven-plugin</module>
<module>artemis-server-osgi</module>
<module>integration/activemq-spring-integration</module>
@ -82,6 +83,7 @@
<resteasy.version>3.0.13.Final</resteasy.version>
<proton.version>0.10</proton.version>
<fuse.mqtt.client.version>1.10</fuse.mqtt.client.version>
<apache.derby.version>10.11.1.1</apache.derby.version>
<skipUnitTests>true</skipUnitTests>
<skipJmsTests>true</skipJmsTests>
<skipExtraTests>true</skipExtraTests>
@ -202,6 +204,11 @@
<version>${fuse.mqtt.client.version}</version>
<!-- Apache v2.0 License -->
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${apache.derby.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>mqtt-client</artifactId>
@ -588,10 +595,12 @@
<module>artemis-native</module>
<module>artemis-protocols</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>artemis-jdbc-store</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
@ -623,6 +632,7 @@
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
@ -681,6 +691,7 @@
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
@ -723,6 +734,7 @@
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
@ -757,6 +769,7 @@
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>

View File

@ -117,6 +117,11 @@
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jdbc-store</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-amqp-protocol</artifactId>

View File

@ -65,7 +65,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
public class FakeEncodingSupportImpl implements EncodingSupport {
private byte[] data;
public FakeEncodingSupportImpl(byte[] data) {
this.data = data;
}
@Override
public int getEncodeSize() {
return data.length;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeBytes(data);
}
@Override
public void decode(ActiveMQBuffer buffer) {
data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class JDBCJournalTest {
private static final String JOURNAL_TABLE_NAME = "MESSAGE_JOURNAL";
private JDBCJournalImpl journal;
private String jdbcUrl;
private Properties jdbcConnectionProperties;
@Before
public void setup() throws Exception {
jdbcUrl = "jdbc:derby:target/data;create=true";
journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME);
journal.start();
}
@Test
public void testInsertRecords() throws Exception {
int noRecords = 10;
for (int i = 0; i < noRecords; i++) {
journal.appendAddRecord(1, (byte) 1, new byte[0], true);
}
Thread.sleep(3000);
assertEquals(noRecords, journal.getNumberOfRecords());
}
@Test
public void testCallbacks() throws Exception {
final int noRecords = 10;
final CountDownLatch done = new CountDownLatch(noRecords);
IOCompletion completion = new IOCompletion() {
@Override
public void storeLineUp() {
}
@Override
public void done() {
done.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
};
for (int i = 0; i < noRecords; i++) {
journal.appendAddRecord(1, (byte) 1, new FakeEncodingSupportImpl(new byte[0]), true, completion);
}
journal.sync();
done.await(5, TimeUnit.SECONDS);
assertEquals(done.getCount(), 0);
}
@Test
public void testReadJournal() throws Exception {
int noRecords = 100;
// Standard Add Records
for (int i = 0; i < noRecords; i++) {
journal.appendAddRecord(i, (byte) i, new byte[i], true);
}
// TX Records
int noTx = 10;
int noTxRecords = 100;
for (int i = 1000; i < 1000 + noTx; i++) {
for (int j = 0; j < noTxRecords; j++) {
journal.appendAddRecordTransactional(i, Long.valueOf(i + "" + j), (byte) 1, new byte[0]);
}
journal.appendPrepareRecord(i, new byte[0], true);
journal.appendCommitRecord(i, true);
}
Thread.sleep(2000);
List<RecordInfo> recordInfos = new ArrayList<>();
List<PreparedTransactionInfo> txInfos = new ArrayList<>();
journal.load(recordInfos, txInfos, null);
assertEquals(noRecords + (noTxRecords * noTx), recordInfos.size());
}
@After
public void tearDown() throws Exception {
journal.destroy();
}
}

View File

@ -16,22 +16,30 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class AddressSettingsConfigurationStorageTest extends StorageManagerTestBase {
private Map<SimpleString, PersistedAddressSetting> mapExpectedAddresses;
public AddressSettingsConfigurationStorageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override
@Before
public void setUp() throws Exception {
@ -40,7 +48,7 @@ public class AddressSettingsConfigurationStorageTest extends StorageManagerTestB
mapExpectedAddresses = new HashMap<>();
}
protected void addAddress(JournalStorageManager journal1, String address, AddressSettings setting) throws Exception {
protected void addAddress(StorageManager journal1, String address, AddressSettings setting) throws Exception {
SimpleString str = new SimpleString(address);
PersistedAddressSetting persistedSetting = new PersistedAddressSetting(str, setting);
mapExpectedAddresses.put(str, persistedSetting);
@ -84,7 +92,7 @@ public class AddressSettingsConfigurationStorageTest extends StorageManagerTestB
* @param journal1
* @throws Exception
*/
private void checkAddresses(JournalStorageManager journal1) throws Exception {
private void checkAddresses(StorageManager journal1) throws Exception {
List<PersistedAddressSetting> listSetting = journal1.recoverAddressSettings();
assertEquals(mapExpectedAddresses.size(), listSetting.size());

View File

@ -17,12 +17,13 @@
package org.apache.activemq.artemis.tests.integration.persistence;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@ -30,8 +31,11 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runners.Parameterized;
public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
@ -39,6 +43,17 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
ArrayList<Long> deletedMessage = new ArrayList<>();
public DeleteMessagesOnStartupTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
// This is only applicable for FILE based store, as the database storage manager will automatically delete records.
@Parameterized.Parameters(name = "storeType")
public static Collection<Object[]> data() {
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}};
return Arrays.asList(params);
}
@Test
public void testDeleteMessagesOnStartup() throws Exception {
createStorage();

View File

@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
@ -32,6 +33,10 @@ import org.junit.Test;
public class DuplicateCacheTest extends StorageManagerTestBase {
public DuplicateCacheTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@After
@Override
public void tearDown() throws Exception {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
@ -36,6 +37,10 @@ public class JMSConnectionFactoryConfigurationStorageTest extends StorageManager
private Map<String, PersistedConnectionFactory> mapExpectedCFs;
public JMSConnectionFactoryConfigurationStorageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override
@Before
public void setUp() throws Exception {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.junit.Assert;
import org.junit.Test;
@ -27,6 +28,10 @@ import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
public class JMSStorageManagerTest extends StorageManagerTestBase {
public JMSStorageManagerTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
//https://issues.jboss.org/browse/HORNETQ-812
@Test
public void testJNDIPersistence() throws Exception {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.junit.Before;
import org.junit.Test;
@ -31,6 +32,10 @@ public class RolesConfigurationStorageTest extends StorageManagerTestBase {
private Map<SimpleString, PersistedRoles> mapExpectedSets;
public RolesConfigurationStorageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override
@Before
public void setUp() throws Exception {

View File

@ -16,9 +16,19 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JDBCJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.jms.persistence.JMSStorageManager;
import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
@ -29,23 +39,39 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.TimeAndCounterIDGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
@RunWith(Parameterized.class)
public abstract class StorageManagerTestBase extends ActiveMQTestBase {
protected ExecutorService executor;
protected ExecutorFactory execFactory;
protected JournalStorageManager journal;
protected StorageManager journal;
protected JMSStorageManager jmsJournal;
protected StoreConfiguration.StoreType storeType;
public StorageManagerTestBase(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}
@Parameterized.Parameters(name = "storeType")
public static Collection<Object[]> data() {
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
return Arrays.asList(params);
}
@Override
@Before
public void setUp() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
}
super.setUp();
execFactory = getOrderedExecutor();
@ -79,6 +105,15 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
jmsJournal = null;
}
// Stops the database engine early to stop thread leaks showing.
if (storeType == StoreConfiguration.StoreType.DATABASE) {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (SQLException e) {
}
}
super.tearDown();
if (exception != null)
throw exception;
@ -88,7 +123,13 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @throws Exception
*/
protected void createStorage() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) {
journal = createJDBCJournalStorageManager(createDefaultJDBCConfig());
}
else {
journal = createJournalStorageManager(createDefaultInVMConfig());
}
journal.start();
@ -106,6 +147,15 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
return jsm;
}
/**
* @param configuration
*/
protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) {
JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, null);
addActiveMQComponent(jsm);
return jsm;
}
/**
* @throws Exception
*/
@ -115,4 +165,5 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
jmsJournal.start();
jmsJournal.load();
}
}