ARTEMIS-514 Add support for LargeMEssages backed by Database
This commit is contained in:
parent
c9b953433e
commit
1c3d63516f
|
@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
* Default values of ActiveMQ Artemis configuration parameters.
|
||||
*/
|
||||
public final class ActiveMQDefaultConfiguration {
|
||||
/*
|
||||
/*
|
||||
* <p> In order to avoid compile time in-lining of constants, all access is done through methods
|
||||
* and all fields are PRIVATE STATIC but not FINAL. This is done following the recommendation at
|
||||
* <a href="http://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html#jls-13.4.9">13.4.9.
|
||||
|
@ -414,6 +414,9 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// Default bindings table name, used with Database storage type
|
||||
private static String DEFAULT_BINDINGS_TABLE_NAME = "BINDINGS";
|
||||
|
||||
// Default large messages table name, used with Database storage type
|
||||
private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES";
|
||||
|
||||
/**
|
||||
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
|
||||
*/
|
||||
|
@ -1103,4 +1106,8 @@ public final class ActiveMQDefaultConfiguration {
|
|||
public static String getDefaultDriverClassName() {
|
||||
return DEFAULT_JDBC_DRIVER_CLASS_NAME;
|
||||
}
|
||||
|
||||
public static String getDefaultLargeMessagesTableName() {
|
||||
return DEFAULT_LARGE_MESSAGES_TABLE_NAME;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
|||
|
||||
private String bindingsTableName = ActiveMQDefaultConfiguration.getDefaultBindingsTableName();
|
||||
|
||||
private String largeMessagesTableName = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName();
|
||||
|
||||
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
|
||||
|
||||
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
|
||||
|
@ -49,6 +51,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
|||
this.bindingsTableName = bindingsTableName;
|
||||
}
|
||||
|
||||
public String getLargeMessageTableName() {
|
||||
return largeMessagesTableName;
|
||||
}
|
||||
|
||||
public void setLargeMessageTableName(String largeMessagesTableName) {
|
||||
this.largeMessagesTableName = largeMessagesTableName;
|
||||
}
|
||||
|
||||
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
|
||||
this.jdbcConnectionUrl = jdbcConnectionUrl;
|
||||
}
|
||||
|
|
|
@ -1144,11 +1144,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
}
|
||||
|
||||
private DatabaseStorageConfiguration createDatabaseStoreConfig(Element storeNode) {
|
||||
NodeList databaseStoreNode = storeNode.getElementsByTagName("database-store");
|
||||
|
||||
DatabaseStorageConfiguration conf = new DatabaseStorageConfiguration();
|
||||
conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK));
|
||||
conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
|
||||
conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
|
||||
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
|
||||
conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
|
||||
return conf;
|
||||
|
|
|
@ -16,13 +16,16 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.persistence.impl.journal;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.Journal;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
|
||||
|
@ -39,14 +42,25 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
|
||||
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
|
||||
protected synchronized void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
|
||||
try {
|
||||
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
|
||||
|
||||
Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName());
|
||||
bindingsJournal = localBindings;
|
||||
Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName());
|
||||
bindingsJournal = localBindings;
|
||||
|
||||
Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName());
|
||||
messageJournal = localMessage;
|
||||
Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName());
|
||||
messageJournal = localMessage;
|
||||
|
||||
bindingsJournal.start();
|
||||
messageJournal.start();
|
||||
|
||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executor);
|
||||
largeMessagesFactory.start();
|
||||
}
|
||||
catch (Exception e) {
|
||||
criticalErrorListener.onIOException(e, e.getMessage(), null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -76,7 +90,9 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
|
|||
|
||||
((JDBCJournalImpl) bindingsJournal).stop(false);
|
||||
|
||||
messageJournal.stop();
|
||||
((JDBCJournalImpl) messageJournal).stop(false);
|
||||
|
||||
largeMessagesFactory.stop();
|
||||
|
||||
singleThreadExecutor.shutdown();
|
||||
|
||||
|
@ -85,4 +101,12 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
|
|||
started = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer allocateDirectBuffer(int size) {
|
||||
return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeDirectBuffer(ByteBuffer buffer) {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,13 +66,13 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
|
||||
private SequentialFileFactory journalFF;
|
||||
|
||||
private SequentialFileFactory largeMessagesFactory;
|
||||
SequentialFileFactory largeMessagesFactory;
|
||||
|
||||
private Journal originalMessageJournal;
|
||||
|
||||
private Journal originalBindingsJournal;
|
||||
|
||||
private String largeMessagesDirectory;
|
||||
protected String largeMessagesDirectory;
|
||||
|
||||
private ReplicationManager replicator;
|
||||
|
||||
|
|
|
@ -143,6 +143,7 @@ public class OperationContextImpl implements OperationContext {
|
|||
}
|
||||
|
||||
// On this case, we can just execute the context directly
|
||||
|
||||
if (replicationLineUp.intValue() == replicated && storeLineUp.intValue() == stored &&
|
||||
pageLineUp.intValue() == paged) {
|
||||
// We want to avoid the executor if everything is complete...
|
||||
|
|
|
@ -1544,6 +1544,13 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
<xsd:element name="large-message-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The table name used to large message files
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:all>
|
||||
</xsd:complexType>
|
||||
|
||||
|
|
|
@ -78,6 +78,7 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
|
|||
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
||||
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||
|
@ -396,20 +397,13 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
return createDefaultConfig(0, netty);
|
||||
}
|
||||
|
||||
protected Configuration createDefaultJDBCConfig() throws Exception {
|
||||
Configuration configuration = createDefaultConfig(true);
|
||||
|
||||
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
|
||||
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
|
||||
dbStorageConfiguration.setBindingsTableName("BINDINGS");
|
||||
dbStorageConfiguration.setMessageTableName("MESSAGES");
|
||||
dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
|
||||
|
||||
configuration.setStoreConfiguration(dbStorageConfiguration);
|
||||
|
||||
protected Configuration createDefaultJDBCConfig(boolean isNetty) throws Exception {
|
||||
Configuration configuration = createDefaultConfig(isNetty);
|
||||
setDBStoreType(configuration);
|
||||
return configuration;
|
||||
}
|
||||
|
||||
|
||||
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
|
||||
ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID)));
|
||||
|
||||
|
@ -448,6 +442,16 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
return configuration;
|
||||
}
|
||||
|
||||
private void setDBStoreType(Configuration configuration) {
|
||||
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
|
||||
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
|
||||
dbStorageConfiguration.setBindingsTableName("BINDINGS");
|
||||
dbStorageConfiguration.setMessageTableName("MESSAGES");
|
||||
dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
|
||||
|
||||
configuration.setStoreConfiguration(dbStorageConfiguration);
|
||||
}
|
||||
|
||||
protected Map<String, Object> generateInVMParams(final int node) {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
|
||||
|
@ -1388,6 +1392,18 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
return server;
|
||||
}
|
||||
|
||||
protected final ActiveMQServer createServer(final boolean realFiles,
|
||||
final Configuration configuration,
|
||||
final long pageSize,
|
||||
final long maxAddressSize,
|
||||
final Map<String, AddressSettings> settings,
|
||||
StoreConfiguration.StoreType storeType) {
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
setDBStoreType(configuration);
|
||||
}
|
||||
return createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
|
||||
}
|
||||
|
||||
protected final ActiveMQServer createServer(final boolean realFiles) throws Exception {
|
||||
return createServer(realFiles, false);
|
||||
}
|
||||
|
@ -1404,6 +1420,11 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
return createServer(configuration.isPersistenceEnabled(), configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
|
||||
}
|
||||
|
||||
protected ActiveMQServer createServer(final boolean realFiles, boolean isNetty, StoreConfiguration.StoreType storeType) throws Exception {
|
||||
Configuration configuration = storeType == StoreConfiguration.StoreType.DATABASE ? createDefaultJDBCConfig(isNetty) : createDefaultConfig(isNetty);
|
||||
return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
|
||||
}
|
||||
|
||||
protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
|
||||
final Configuration configuration,
|
||||
final NodeManager nodeManager,
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
<jdbc-connection-url>jdbc:derby:target/derby/database-store;create=true</jdbc-connection-url>
|
||||
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
|
||||
<message-table-name>MESSAGE_TABLE</message-table-name>
|
||||
<large-message-table-name>LARGE_MESSAGE_TABLE</large-message-table-name>
|
||||
<jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name>
|
||||
</database-store>
|
||||
</store>
|
||||
|
|
|
@ -376,6 +376,7 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
|
|||
<jdbc-connection-url>jdbc:derby:target/derby/database-store;create=true</jdbc-connection-url>
|
||||
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
|
||||
<message-table-name>MESSAGE_TABLE</message-table-name>
|
||||
<large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name>
|
||||
<jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name>
|
||||
</database-store>
|
||||
</store>
|
||||
|
@ -384,13 +385,17 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
|
|||
- `jdbc-connection-url`
|
||||
|
||||
The full JDBC connection URL for your database server. The connection url should include all configuration parameters and database name.
|
||||
|
||||
|
||||
- `bindings-table-name`
|
||||
|
||||
The name of the table in which bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
|
||||
|
||||
|
||||
- `message-table-name`
|
||||
|
||||
The name of the table in which bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
|
||||
|
||||
- `large-message-table-name`
|
||||
|
||||
The name of the table in which messages and related data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
|
||||
|
||||
- `jdbc-driver-class-name`
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
|
@ -72,6 +73,10 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
protected ServerLocator locator;
|
||||
|
||||
public InterruptedLargeMessageTest(StoreConfiguration.StoreType storeType) {
|
||||
super(storeType);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
@ -37,7 +38,8 @@ import org.junit.Test;
|
|||
*/
|
||||
public class LargeMessageAvoidLargeMessagesTest extends LargeMessageTest {
|
||||
|
||||
public LargeMessageAvoidLargeMessagesTest() {
|
||||
public LargeMessageAvoidLargeMessagesTest(StoreConfiguration.StoreType storeType) {
|
||||
super(storeType);
|
||||
isCompressedTest = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.Assert;
|
||||
|
@ -45,7 +46,8 @@ import org.junit.Test;
|
|||
public class LargeMessageCompressTest extends LargeMessageTest {
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
public LargeMessageCompressTest() {
|
||||
public LargeMessageCompressTest(StoreConfiguration.StoreType storeType) {
|
||||
super(storeType);
|
||||
isCompressedTest = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,17 +29,17 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
|
||||
|
@ -48,33 +48,35 @@ import org.apache.activemq.artemis.core.server.Queue;
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class LargeMessageTest extends LargeMessageTestBase {
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
static final int RECEIVE_WAIT_TIME = 10000;
|
||||
|
||||
private final int LARGE_MESSAGE_SIZE = 20 * 1024;
|
||||
|
||||
private final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
protected ServerLocator locator;
|
||||
|
||||
protected boolean isCompressedTest = false;
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
private int largeMessageSize;
|
||||
|
||||
protected boolean isNetty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public LargeMessageTest(StoreConfiguration.StoreType storeType) {
|
||||
super(storeType);
|
||||
// The JDBC Large Message store is pretty slow, to speed tests up we only test 5MB large messages
|
||||
largeMessageSize = (storeType == StoreConfiguration.StoreType.DATABASE) ? 5 * 1024 : 100 * 1024;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackPartiallyConsumedBuffer() throws Exception {
|
||||
for (int i = 0; i < 1; i++) {
|
||||
|
@ -82,9 +84,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
internalTestRollbackPartiallyConsumedBuffer(false);
|
||||
tearDown();
|
||||
setUp();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -93,11 +93,12 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
}
|
||||
|
||||
private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception {
|
||||
final int messageSize = 100 * 1024;
|
||||
final int messageSize = largeMessageSize;
|
||||
|
||||
final ClientSession session;
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
|
||||
AddressSettings settings = new AddressSettings();
|
||||
if (redeliveryDelay) {
|
||||
|
@ -184,7 +185,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
ClientSession session = null;
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -229,7 +230,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
public void testDeleteOnNoBinding() throws Exception {
|
||||
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -277,7 +278,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
|
||||
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
|
||||
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -338,7 +339,8 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
ClientSession session = null;
|
||||
|
||||
Configuration config = createDefaultConfig(isNetty()).setJournalFileSize(journalsize).setJournalBufferSize_AIO(10 * 1024).setJournalBufferSize_NIO(10 * 1024);
|
||||
Configuration config = storeType == StoreConfiguration.StoreType.DATABASE ? createDefaultJDBCConfig(isNetty()) : createDefaultConfig(isNetty());
|
||||
config.setJournalFileSize(journalsize).setJournalBufferSize_AIO(10 * 1024).setJournalBufferSize_NIO(10 * 1024);
|
||||
|
||||
ActiveMQServer server = createServer(true, config);
|
||||
|
||||
|
@ -396,7 +398,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
ClientSession session = null;
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -445,7 +447,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
session.close();
|
||||
server.stop();
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -469,7 +471,9 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
session.commit();
|
||||
|
||||
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), isCompressedTest ? 0 : 1);
|
||||
if (storeType != StoreConfiguration.StoreType.DATABASE) {
|
||||
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), isCompressedTest ? 0 : 1);
|
||||
}
|
||||
|
||||
consumer = session.createConsumer(ADDRESS.concat("-2"));
|
||||
|
||||
|
@ -487,7 +491,9 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
session.close();
|
||||
|
||||
validateNoFilesOnLargeDir();
|
||||
if (storeType != StoreConfiguration.StoreType.DATABASE) {
|
||||
validateNoFilesOnLargeDir();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -496,7 +502,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
ClientSession session = null;
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -557,7 +563,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
ClientSession session = null;
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -665,7 +671,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
ClientSession session = null;
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -734,7 +740,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
session.close();
|
||||
server.stop();
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -772,7 +778,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
ClientSession session = null;
|
||||
|
||||
try {
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -819,7 +825,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
session.close();
|
||||
server.stop();
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -872,7 +878,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
ClientSession session = null;
|
||||
|
||||
try {
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -944,7 +950,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
ClientSession session = null;
|
||||
|
||||
try {
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -1019,7 +1025,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
ClientSession session = null;
|
||||
|
||||
try {
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -1090,12 +1096,12 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
@Test
|
||||
public void testFilePersistenceOneHugeMessage() throws Exception {
|
||||
testChunks(false, false, false, true, true, false, false, false, false, 1, 100 * 1024L * 1024L, LargeMessageTest.RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024);
|
||||
testChunks(false, false, false, true, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceOneMessageStreaming() throws Exception {
|
||||
testChunks(false, false, false, true, true, false, false, false, false, 1, 100 * 1024L * 1024L, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(false, false, false, true, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1105,22 +1111,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
@Test
|
||||
public void testFilePersistenceOneHugeMessageConsumer() throws Exception {
|
||||
testChunks(false, false, false, true, true, false, false, false, true, 1, 100 * 1024 * 1024, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
|
||||
testChunks(false, false, false, true, true, false, false, false, true, 1, largeMessageSize, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistence() throws Exception {
|
||||
testChunks(false, false, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(false, false, true, false, true, false, false, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceConsumer() throws Exception {
|
||||
testChunks(false, false, true, false, true, false, false, true, true, 2, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(false, false, true, false, true, false, false, true, true, 2, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceXA() throws Exception {
|
||||
testChunks(true, false, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, false, true, false, true, false, false, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1135,122 +1141,122 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
@Test
|
||||
public void testFilePersistenceXAConsumer() throws Exception {
|
||||
testChunks(true, false, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, false, true, false, true, false, false, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceXAConsumerRestart() throws Exception {
|
||||
testChunks(true, true, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, true, true, false, true, false, false, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlocked() throws Exception {
|
||||
testChunks(false, false, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(false, false, true, false, true, false, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlockedConsumer() throws Exception {
|
||||
testChunks(false, false, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(false, false, true, false, true, false, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlockedXA() throws Exception {
|
||||
testChunks(true, false, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, false, true, false, true, false, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlockedXAConsumer() throws Exception {
|
||||
testChunks(true, false, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, false, true, false, true, false, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlockedPreACK() throws Exception {
|
||||
testChunks(false, false, true, false, true, true, true, true, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(false, false, true, false, true, true, true, true, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlockedPreACKConsumer() throws Exception {
|
||||
testChunks(false, false, true, false, true, true, true, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(false, false, true, false, true, true, true, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlockedPreACKXA() throws Exception {
|
||||
testChunks(true, false, true, false, true, true, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, false, true, false, true, true, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlockedPreACKXARestart() throws Exception {
|
||||
testChunks(true, true, true, false, true, true, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, true, true, false, true, true, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlockedPreACKXAConsumer() throws Exception {
|
||||
testChunks(true, false, true, false, true, true, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, false, true, false, true, true, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceBlockedPreACKXAConsumerRestart() throws Exception {
|
||||
testChunks(true, true, true, false, true, true, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, true, true, false, true, true, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceDelayed() throws Exception {
|
||||
testChunks(false, false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
|
||||
testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceDelayedConsumer() throws Exception {
|
||||
testChunks(false, false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
|
||||
testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceDelayedXA() throws Exception {
|
||||
testChunks(true, false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
|
||||
testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePersistenceDelayedXAConsumer() throws Exception {
|
||||
testChunks(true, false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
|
||||
testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullPersistence() throws Exception {
|
||||
testChunks(false, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(false, false, true, false, false, false, false, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullPersistenceConsumer() throws Exception {
|
||||
testChunks(false, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(false, false, true, false, false, false, false, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullPersistenceXA() throws Exception {
|
||||
testChunks(true, false, true, false, false, false, false, true, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, false, true, false, false, false, false, true, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullPersistenceXAConsumer() throws Exception {
|
||||
testChunks(true, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
testChunks(true, false, true, false, false, false, false, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullPersistenceDelayed() throws Exception {
|
||||
testChunks(false, false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
|
||||
testChunks(false, false, true, false, false, false, false, false, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullPersistenceDelayedConsumer() throws Exception {
|
||||
testChunks(false, false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
|
||||
testChunks(false, false, true, false, false, false, false, false, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullPersistenceDelayedXA() throws Exception {
|
||||
testChunks(true, false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
|
||||
testChunks(true, false, true, false, false, false, false, false, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullPersistenceDelayedXAConsumer() throws Exception {
|
||||
testChunks(true, false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
|
||||
testChunks(true, false, true, false, false, false, false, false, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1343,7 +1349,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
// there are two bindings.. one is ACKed, the other is not, the server is restarted
|
||||
// The other binding is acked... The file must be deleted
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -1410,7 +1416,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
public void testTwoBindings(final boolean restart) throws Exception {
|
||||
// there are two bindings.. one is ACKed, the other is not, the server is restarted
|
||||
// The other binding is acked... The file must be deleted
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -1439,7 +1445,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
server.stop();
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -1478,7 +1484,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
private void internalTestSendRollback(final boolean isXA, final boolean durable) throws Exception {
|
||||
ClientSession session = null;
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -1536,7 +1542,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
public void simpleRollbackInternalTest(final boolean isXA) throws Exception {
|
||||
// there are two bindings.. one is ACKed, the other is not, the server is restarted
|
||||
// The other binding is acked... The file must be deleted
|
||||
ActiveMQServer server = createServer(true, isNetty());
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -1644,7 +1650,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
final int NUMBER_OF_MESSAGES = 30;
|
||||
try {
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -1730,7 +1736,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
final int NUMBER_OF_MESSAGES = 1000;
|
||||
try {
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -1821,7 +1827,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
AddressSettings value = new AddressSettings();
|
||||
map.put(ADDRESS.toString(), value);
|
||||
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
|
||||
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType);
|
||||
server.start();
|
||||
|
||||
final int numberOfBytes = 1024;
|
||||
|
@ -1871,7 +1877,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
server.stop();
|
||||
|
||||
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
|
||||
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType);
|
||||
server.start();
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
|
@ -1948,7 +1954,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
AddressSettings value = new AddressSettings();
|
||||
map.put(ADDRESS.toString(), value);
|
||||
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
|
||||
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType);
|
||||
server.start();
|
||||
|
||||
final int numberOfBytes = 1024;
|
||||
|
@ -2048,11 +2054,11 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
final int SIZE = 10 * 1024 * 1024;
|
||||
try {
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
locator.setMinLargeMessageSize(100 * 1024);
|
||||
locator.setMinLargeMessageSize(largeMessageSize * 1024);
|
||||
|
||||
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
|
||||
|
||||
|
@ -2081,7 +2087,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
msg2.acknowledge();
|
||||
|
||||
msg2.setOutputStream(createFakeOutputStream());
|
||||
Assert.assertTrue(msg2.waitOutputStreamCompletion(60000));
|
||||
Assert.assertTrue(msg2.waitOutputStreamCompletion(0));
|
||||
|
||||
session.commit();
|
||||
|
||||
|
@ -2089,6 +2095,10 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
|
||||
|
||||
}
|
||||
catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
throw t;
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
session.close();
|
||||
|
@ -2115,11 +2125,11 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
final int SIZE = 0;
|
||||
try {
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
locator.setMinLargeMessageSize(100 * 1024);
|
||||
locator.setMinLargeMessageSize(largeMessageSize * 1024);
|
||||
|
||||
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
|
||||
|
||||
|
@ -2184,11 +2194,11 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
final int SIZE = 0;
|
||||
try {
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
locator.setMinLargeMessageSize(100 * 1024);
|
||||
locator.setMinLargeMessageSize(largeMessageSize * 1024);
|
||||
|
||||
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
|
||||
|
||||
|
@ -2265,7 +2275,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
final int SIZE = 10 * 1024;
|
||||
final int NUMBER_OF_MESSAGES = 1;
|
||||
|
||||
server = createServer(true, isNetty());
|
||||
server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -2320,7 +2330,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
// The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge
|
||||
@Test
|
||||
public void testSendServerMessage() throws Exception {
|
||||
ActiveMQServer server = createServer(true);
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
|
||||
server.start();
|
||||
|
||||
|
@ -2332,12 +2342,12 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
fileMessage.setMessageID(1005);
|
||||
|
||||
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
|
||||
for (int i = 0; i < largeMessageSize; i++) {
|
||||
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
|
||||
}
|
||||
|
||||
// The server would be doing this
|
||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, LARGE_MESSAGE_SIZE);
|
||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
|
||||
|
||||
fileMessage.releaseResources();
|
||||
|
||||
|
@ -2359,9 +2369,9 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
Assert.assertNotNull(msg);
|
||||
|
||||
Assert.assertEquals(msg.getBodySize(), LARGE_MESSAGE_SIZE);
|
||||
Assert.assertEquals(msg.getBodySize(), largeMessageSize);
|
||||
|
||||
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
|
||||
for (int i = 0; i < largeMessageSize; i++) {
|
||||
Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), msg.getBodyBuffer().readByte());
|
||||
}
|
||||
|
||||
|
@ -2379,6 +2389,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
locator = createFactory(isNetty());
|
||||
locator.setCallTimeout(100000000);
|
||||
}
|
||||
|
||||
protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception {
|
||||
|
@ -2392,7 +2403,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
|
||||
AddressSettings value = new AddressSettings();
|
||||
map.put(ADDRESS.toString(), value);
|
||||
ActiveMQServer server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map);
|
||||
ActiveMQServer server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map, storeType);
|
||||
server.start();
|
||||
|
||||
final int numberOfBytes = 1024;
|
||||
|
@ -2435,7 +2446,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
if (realFiles) {
|
||||
server.stop();
|
||||
|
||||
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
|
||||
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType);
|
||||
server.start();
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
|
|||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -33,7 +34,7 @@ import org.junit.Test;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class JDBCJournalTest {
|
||||
public class JDBCJournalTest extends ActiveMQTestBase {
|
||||
|
||||
@Rule
|
||||
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
|
@ -35,18 +37,23 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.DeflaterReader;
|
||||
import org.junit.Assert;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public abstract class LargeMessageTestBase extends ActiveMQTestBase {
|
||||
|
||||
// Constants -----------------------------------------------------
|
||||
|
@ -66,6 +73,20 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
|
|||
|
||||
// Protected -----------------------------------------------------
|
||||
|
||||
protected StoreConfiguration.StoreType storeType;
|
||||
|
||||
public LargeMessageTestBase(StoreConfiguration.StoreType storeType) {
|
||||
this.storeType = storeType;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "storeType={0}")
|
||||
public static Collection<Object[]> data() {
|
||||
// Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
|
||||
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.DATABASE}};
|
||||
//Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}};
|
||||
return Arrays.asList(params);
|
||||
}
|
||||
|
||||
protected void testChunks(final boolean isXA,
|
||||
final boolean restartOnXA,
|
||||
final boolean rollbackFirstSend,
|
||||
|
@ -99,7 +120,15 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
|
|||
final int minSize) throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
ActiveMQServer server = createServer(realFiles);
|
||||
Configuration configuration;
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
configuration = createDefaultJDBCConfig(true);
|
||||
}
|
||||
else {
|
||||
configuration = createDefaultConfig(false);
|
||||
}
|
||||
|
||||
ActiveMQServer server = createServer(realFiles, configuration);
|
||||
server.start();
|
||||
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
|
@ -200,7 +229,7 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
|
|||
if (realFiles) {
|
||||
server.stop();
|
||||
|
||||
server = createServer(realFiles);
|
||||
server = createServer(realFiles, configuration);
|
||||
server.start();
|
||||
|
||||
sf = locator.createSessionFactory();
|
||||
|
@ -352,13 +381,14 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
|
|||
|
||||
@Override
|
||||
public void write(final byte[] b) throws IOException {
|
||||
if (b[0] == ActiveMQTestBase.getSamplebyte(bytesRead.get())) {
|
||||
bytesRead.addAndGet(b.length);
|
||||
if (b.length > 0) {
|
||||
if (b[0] == ActiveMQTestBase.getSamplebyte(bytesRead.get())) {
|
||||
bytesRead.addAndGet(b.length);
|
||||
}
|
||||
else {
|
||||
LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get());
|
||||
}
|
||||
}
|
||||
else {
|
||||
LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -426,12 +456,17 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
|
|||
validateNoFilesOnLargeDir();
|
||||
|
||||
}
|
||||
catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
locator.close();
|
||||
try {
|
||||
server.stop();
|
||||
}
|
||||
catch (Throwable ignored) {
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -442,7 +477,7 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
|
|||
* @param delayDelivery
|
||||
* @param session
|
||||
* @param producer
|
||||
* @throws FileNotFoundException
|
||||
* @throws Exception
|
||||
* @throws IOException
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
|
@ -523,7 +558,6 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
|
|||
* @param queueToRead
|
||||
* @param numberOfBytes
|
||||
* @throws ActiveMQException
|
||||
* @throws FileNotFoundException
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void readMessage(final ClientSession session,
|
||||
|
|
|
@ -125,7 +125,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
|
|||
protected void createStorage() throws Exception {
|
||||
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
journal = createJDBCJournalStorageManager(createDefaultJDBCConfig());
|
||||
journal = createJDBCJournalStorageManager(createDefaultJDBCConfig(true));
|
||||
}
|
||||
else {
|
||||
journal = createJournalStorageManager(createDefaultInVMConfig());
|
||||
|
|
|
@ -101,7 +101,7 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
|
|||
addressSettings.clear();
|
||||
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
configuration = createDefaultJDBCConfig().setJMXManagementEnabled(true);
|
||||
configuration = createDefaultJDBCConfig(true).setJMXManagementEnabled(true);
|
||||
}
|
||||
else {
|
||||
configuration = createDefaultInVMConfig().setJMXManagementEnabled(true);
|
||||
|
|
|
@ -89,7 +89,7 @@ public class BasicXaTest extends ActiveMQTestBase {
|
|||
addressSettings.clear();
|
||||
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
configuration = createDefaultJDBCConfig();
|
||||
configuration = createDefaultJDBCConfig(true);
|
||||
}
|
||||
else {
|
||||
configuration = createDefaultNettyConfig();
|
||||
|
|
|
@ -99,7 +99,7 @@ public class XaTimeoutTest extends ActiveMQTestBase {
|
|||
addressSettings.clear();
|
||||
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
configuration = createDefaultJDBCConfig();
|
||||
configuration = createDefaultJDBCConfig(true);
|
||||
}
|
||||
else {
|
||||
configuration = createBasicConfig();
|
||||
|
|
|
@ -16,11 +16,16 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.stress.chunk;
|
||||
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
|
||||
import org.junit.Test;
|
||||
|
||||
public class LargeMessageStressTest extends LargeMessageTestBase {
|
||||
|
||||
public LargeMessageStressTest(StoreConfiguration.StoreType storeType) {
|
||||
super(storeType);
|
||||
}
|
||||
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue