This closes #872

This commit is contained in:
jbertram 2016-10-31 15:00:45 -05:00
commit cdb52b8a0f
75 changed files with 1465 additions and 1196 deletions

View File

@ -33,7 +33,6 @@ import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalRecord;
import org.apache.activemq.artemis.utils.Base64;
@Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files")
@ -125,8 +124,6 @@ public class DecodeJournal extends LockAbstract {
long lineNumber = 0;
Map<Long, JournalRecord> journalRecords = journal.getRecords();
while ((line = buffReader.readLine()) != null) {
lineNumber++;
String[] splitLine = line.split(",");
@ -150,12 +147,6 @@ public class DecodeJournal extends LockAbstract {
counter.incrementAndGet();
RecordInfo info = parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
} else if (operation.equals("AddRecordTX")) {
long txID = parseLong("txID", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
counter.incrementAndGet();
RecordInfo info = parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
} else if (operation.equals("UpdateTX")) {
long txID = parseLong("txID", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
@ -168,20 +159,17 @@ public class DecodeJournal extends LockAbstract {
} else if (operation.equals("DeleteRecord")) {
long id = parseLong("id", lineProperties);
// If not found it means the append/update records were reclaimed already
if (journalRecords.get(id) != null) {
try {
journal.appendDeleteRecord(id, false);
} catch (IllegalStateException ignored) {
// If not found it means the append/update records were reclaimed already
}
} else if (operation.equals("DeleteRecordTX")) {
long txID = parseLong("txID", lineProperties);
long id = parseLong("id", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
counter.incrementAndGet();
// If not found it means the append/update records were reclaimed already
if (journalRecords.get(id) != null) {
journal.appendDeleteRecordTransactional(txID, id);
}
journal.appendDeleteRecordTransactional(txID, id);
} else if (operation.equals("Prepare")) {
long txID = parseLong("txID", lineProperties);
int numberOfRecords = parseInt("numberOfRecords", lineProperties);

View File

@ -90,6 +90,7 @@ import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorag
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
public final class XmlDataExporter extends OptionalLocking {
@ -142,15 +143,10 @@ public final class XmlDataExporter extends OptionalLocking {
String pagingDir,
String largeMessagesDir) throws Exception {
config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO);
final ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
ExecutorFactory executorFactory = new ExecutorFactory() {
@Override
public Executor getExecutor() {
return executor;
}
};
final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
storageManager = new JournalStorageManager(config, executorFactory);
storageManager = new JournalStorageManager(config, executorFactory, executorFactory);
XMLOutputFactory factory = XMLOutputFactory.newInstance();
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
@ -158,6 +154,8 @@ public final class XmlDataExporter extends OptionalLocking {
xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
writeXMLData();
executor.shutdown();
}
private void writeXMLData() throws Exception {

View File

@ -22,7 +22,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.jboss.logging.Logger;
/**
@ -104,7 +103,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
// This could happen during shutdowns. Nothing to be concerned about here
logger.debug("Interrupted Thread", e);
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
logger.warn(t.getMessage(), t);
}
task = tasks.poll();
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SimpleFuture<V> implements Future<V> {
public SimpleFuture() {
}
V value;
Exception exception;
private final CountDownLatch latch = new CountDownLatch(1);
boolean canceled = false;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
canceled = true;
latch.countDown();
return true;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return latch.getCount() <= 0;
}
public void fail(Exception e) {
this.exception = e;
latch.countDown();
}
@Override
public V get() throws InterruptedException, ExecutionException {
latch.await();
if (this.exception != null) {
throw new ExecutionException(this.exception);
}
return value;
}
public void set(V v) {
this.value = v;
latch.countDown();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
latch.await(timeout, unit);
return value;
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
public class SimpleFutureTest {
@Rule
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
@Test
public void testFuture() throws Exception {
final long randomStart = System.currentTimeMillis();
final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
Thread t = new Thread() {
@Override
public void run() {
simpleFuture.set(randomStart);
}
};
t.start();
Assert.assertEquals(randomStart, simpleFuture.get().longValue());
}
@Test
public void testException() throws Exception {
final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
Thread t = new Thread() {
@Override
public void run() {
simpleFuture.fail(new Exception("hello"));
}
};
t.start();
boolean failed = false;
try {
simpleFuture.get();
} catch (Exception e) {
failed = true;
}
Assert.assertTrue(failed);
}
}

View File

@ -284,8 +284,6 @@ public class ClientProducerImpl implements ClientProducerInternal {
theCredits.acquireCredits(creditSize);
session.checkDefaultAddress(sendingAddress);
sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
}

View File

@ -135,8 +135,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private volatile boolean mayAttemptToFailover = true;
private volatile SimpleString defaultAddress;
/**
* Current XID. this will be used in case of failover
*/
@ -957,7 +955,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
// want
// to recreate the session, we just want to unblock the blocking call
if (!inClose && mayAttemptToFailover) {
sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress);
sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : consumers.entrySet()) {
@ -1036,27 +1034,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
@Override
public void setAddress(final Message message, final SimpleString address) {
if (defaultAddress == null) {
logger.tracef("setAddress() Setting default address as %s", address);
logger.tracef("setAddress() Setting default address as %s", address);
message.setAddress(address);
} else {
if (!address.equals(defaultAddress)) {
logger.tracef("setAddress() setting non default address %s on message", address);
message.setAddress(address);
} else {
logger.trace("setAddress() being set as null");
message.setAddress(null);
}
}
}
@Override
public void checkDefaultAddress(SimpleString address) {
if (defaultAddress == null) {
logger.tracef("checkDefaultAddress(%s)", address);
defaultAddress = address;
}
message.setAddress(address);
}
@Override

View File

@ -93,8 +93,6 @@ public interface ClientSessionInternal extends ClientSession {
*/
void setAddress(Message message, SimpleString address);
void checkDefaultAddress(SimpleString address);
void setPacketSize(int packetSize);
void resetIfNeeded() throws ActiveMQException;

View File

@ -629,9 +629,8 @@ public class ActiveMQSessionContext extends SessionContext {
final boolean xa,
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
final SimpleString defaultAddress) throws ActiveMQException {
Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress);
final boolean preAcknowledge) throws ActiveMQException {
Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
boolean retry;
do {
try {
@ -662,9 +661,8 @@ public class ActiveMQSessionContext extends SessionContext {
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
SimpleString defaultAddress) {
return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
boolean preAcknowledge) {
return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null);
}
@Override

View File

@ -250,8 +250,7 @@ public abstract class SessionContext {
final boolean xa,
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
final SimpleString defaultAddress) throws ActiveMQException;
final boolean preAcknowledge) throws ActiveMQException;
public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException;

View File

@ -1,143 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSequentialSequentialFileDriver;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.jboss.logging.Logger;
public class JDBCUtils {
private static final Logger logger = Logger.getLogger(JDBCUtils.class);
public static Driver getDriver(String className) throws Exception {
try {
Driver driver = (Driver) Class.forName(className).newInstance();
// Shutdown the derby if using the derby embedded driver.
if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception e) {
}
}
});
}
return driver;
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not find class: " + className);
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate driver class: ", e);
}
}
public static void createTableIfNotExists(Connection connection, String tableName, String sql) throws SQLException {
logger.tracef("Validating if table %s didn't exist before creating", tableName);
try {
connection.setAutoCommit(false);
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
if (rs != null && !rs.next()) {
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, sql);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(sql);
}
}
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
}
}
public static SQLProvider.Factory getSQLProviderFactory(String url) {
SQLProvider.Factory factory;
if (url.contains("derby")) {
logger.tracef("getSQLProvider Returning Derby SQL provider for url::%s", url);
factory = new DerbySQLProvider.Factory();
} else if (url.contains("postgres")) {
logger.tracef("getSQLProvider Returning postgres SQL provider for url::%s", url);
factory = new PostgresSQLProvider.Factory();
} else if (url.contains("mysql")) {
logger.tracef("getSQLProvider Returning mysql SQL provider for url::%s", url);
factory = new MySQLSQLProvider.Factory();
} else {
logger.tracef("getSQLProvider Returning generic SQL provider for url::%s", url);
factory = new GenericSQLProvider.Factory();
}
return factory;
}
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
SQLProvider.Factory factory;
if (driverClass.contains("derby")) {
logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
factory = new DerbySQLProvider.Factory();
} else if (driverClass.contains("postgres")) {
logger.tracef("getSQLProvider Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName);
factory = new PostgresSQLProvider.Factory();
} else if (driverClass.contains("mysql")) {
logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
factory = new MySQLSQLProvider.Factory();
} else {
logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName);
factory = new GenericSQLProvider.Factory();
}
return factory.create(tableName);
}
public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
String jdbcConnectionUrl,
SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(provider);
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
return dbDriver;
}
public static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource,
String tableName,
SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver;
if (provider instanceof PostgresSQLProvider) {
dbDriver = new PostgresSequentialSequentialFileDriver();
dbDriver.setDataSource(dataSource);
} else {
dbDriver = new JDBCSequentialFileFactoryDriver(tableName, dataSource, provider);
}
return dbDriver;
}
}

View File

@ -19,30 +19,32 @@ package org.apache.activemq.artemis.jdbc.store.drivers;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
/**
* Class to hold common database functionality such as drivers and connections
*/
public abstract class AbstractJDBCDriver {
private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class);
protected Connection connection;
protected SQLProvider sqlProvider;
protected String jdbcConnectionUrl;
private String jdbcConnectionUrl;
protected String jdbcDriverClass;
private String jdbcDriverClass;
protected Driver dbDriver;
protected DataSource dataSource;
private DataSource dataSource;
public AbstractJDBCDriver() {
}
@ -75,7 +77,7 @@ public abstract class AbstractJDBCDriver {
protected abstract void createSchema() throws SQLException;
protected void createTable(String schemaSql) throws SQLException {
JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), schemaSql);
createTableIfNotExists(connection, sqlProvider.getTableName(), schemaSql);
}
protected void connect() throws Exception {
@ -83,7 +85,7 @@ public abstract class AbstractJDBCDriver {
connection = dataSource.getConnection();
} else {
try {
dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
Driver dbDriver = getDriver(jdbcDriverClass);
connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
} catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
@ -105,6 +107,48 @@ public abstract class AbstractJDBCDriver {
}
}
private static void createTableIfNotExists(Connection connection, String tableName, String sql) throws SQLException {
logger.tracef("Validating if table %s didn't exist before creating", tableName);
try {
connection.setAutoCommit(false);
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
if (rs != null && !rs.next()) {
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, sql);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(sql);
}
}
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
}
}
private Driver getDriver(String className) throws Exception {
try {
Driver driver = (Driver) Class.forName(className).newInstance();
// Shutdown the derby if using the derby embedded driver.
if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception e) {
}
}
});
}
return driver;
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not find class: " + className);
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate driver class: ", e);
}
}
public Connection getConnection() {
return connection;
}
@ -113,34 +157,18 @@ public abstract class AbstractJDBCDriver {
this.connection = connection;
}
public SQLProvider getSqlProvider() {
return sqlProvider;
}
public void setSqlProvider(SQLProvider sqlProvider) {
this.sqlProvider = sqlProvider;
}
public String getJdbcConnectionUrl() {
return jdbcConnectionUrl;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}
public String getJdbcDriverClass() {
return jdbcDriverClass;
}
public void setJdbcDriverClass(String jdbcDriverClass) {
this.jdbcDriverClass = jdbcDriverClass;
}
public DataSource getDataSource() {
return dataSource;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.jboss.logging.Logger;
public class JDBCUtils {
private static final Logger logger = Logger.getLogger(JDBCUtils.class);
public static SQLProvider.Factory getSQLProviderFactory(String url) {
SQLProvider.Factory factory;
if (url.contains("derby")) {
logger.tracef("getSQLProvider Returning Derby SQL provider for url::%s", url);
factory = new DerbySQLProvider.Factory();
} else if (url.contains("postgres")) {
logger.tracef("getSQLProvider Returning postgres SQL provider for url::%s", url);
factory = new PostgresSQLProvider.Factory();
} else if (url.contains("mysql")) {
logger.tracef("getSQLProvider Returning mysql SQL provider for url::%s", url);
factory = new MySQLSQLProvider.Factory();
} else {
logger.tracef("getSQLProvider Returning generic SQL provider for url::%s", url);
factory = new GenericSQLProvider.Factory();
}
return factory;
}
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
SQLProvider.Factory factory;
if (driverClass.contains("derby")) {
logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
factory = new DerbySQLProvider.Factory();
} else if (driverClass.contains("postgres")) {
logger.tracef("getSQLProvider Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName);
factory = new PostgresSQLProvider.Factory();
} else if (driverClass.contains("mysql")) {
logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
factory = new MySQLSQLProvider.Factory();
} else {
logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName);
factory = new GenericSQLProvider.Factory();
}
return factory.create(tableName);
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
class JDBCFileUtils {
static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
String jdbcConnectionUrl,
SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(provider);
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
return dbDriver;
}
static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource, SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver;
if (provider instanceof PostgresSQLProvider) {
dbDriver = new PostgresSequentialSequentialFileDriver();
dbDriver.setDataSource(dataSource);
} else {
dbDriver = new JDBCSequentialFileFactoryDriver(dataSource, provider);
}
return dbDriver;
}
}

View File

@ -64,11 +64,11 @@ public class JDBCSequentialFile implements SequentialFile {
// Allows DB Drivers to cache meta data.
private final Map<Object, Object> metaData = new ConcurrentHashMap<>();
public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
final String filename,
final Executor executor,
final JDBCSequentialFileFactoryDriver driver,
final Object writeLock) throws SQLException {
JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
final String filename,
final Executor executor,
final JDBCSequentialFileFactoryDriver driver,
final Object writeLock) throws SQLException {
this.fileFactory = fileFactory;
this.filename = filename;
this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
@ -77,7 +77,7 @@ public class JDBCSequentialFile implements SequentialFile {
this.dbDriver = driver;
}
public void setWritePosition(int writePosition) {
void setWritePosition(int writePosition) {
this.writePosition = writePosition;
}
@ -172,7 +172,7 @@ public class JDBCSequentialFile implements SequentialFile {
return internalWrite(buffer.array(), callback);
}
public void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
executor.execute(new Runnable() {
@Override
public void run() {
@ -181,7 +181,7 @@ public class JDBCSequentialFile implements SequentialFile {
});
}
public void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
executor.execute(new Runnable() {
@Override
public void run() {
@ -358,10 +358,6 @@ public class JDBCSequentialFile implements SequentialFile {
metaData.put(key, value);
}
public Object removeMetaData(Object key) {
return metaData.remove(key);
}
public Object getMetaData(Object key) {
return metaData.get(key);
}

View File

@ -30,7 +30,6 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
@ -48,10 +47,9 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
public JDBCSequentialFileFactory(final DataSource dataSource,
final SQLProvider sqlProvider,
final String tableName,
Executor executor) throws Exception {
this.executor = executor;
dbDriver = JDBCUtils.getDBFileDriver(dataSource, tableName, sqlProvider);
dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
}
public JDBCSequentialFileFactory(final String connectionUrl,
@ -59,7 +57,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
final SQLProvider sqlProvider,
Executor executor) throws Exception {
this.executor = executor;
dbDriver = JDBCUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
}
@Override
@ -88,9 +86,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override
public SequentialFile createSequentialFile(String fileName) {
try {
if (fileLocks.get(fileName) == null) {
fileLocks.put(fileName, new Object());
}
fileLocks.putIfAbsent(fileName, new Object());
JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, executor, dbDriver, fileLocks.get(fileName));
files.add(file);
return file;

View File

@ -33,25 +33,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
protected PreparedStatement deleteFile;
protected PreparedStatement createFile;
PreparedStatement createFile;
protected PreparedStatement selectFileByFileName;
private PreparedStatement selectFileByFileName;
protected PreparedStatement copyFileRecord;
private PreparedStatement copyFileRecord;
protected PreparedStatement renameFile;
private PreparedStatement renameFile;
protected PreparedStatement readLargeObject;
PreparedStatement readLargeObject;
protected PreparedStatement appendToLargeObject;
private PreparedStatement appendToLargeObject;
protected PreparedStatement selectFileNamesByExtension;
private PreparedStatement selectFileNamesByExtension;
public JDBCSequentialFileFactoryDriver() {
JDBCSequentialFileFactoryDriver() {
super();
}
public JDBCSequentialFileFactoryDriver(String tableName, DataSource dataSource, SQLProvider provider) {
JDBCSequentialFileFactoryDriver(DataSource dataSource, SQLProvider provider) {
super(dataSource, provider);
}

View File

@ -14,14 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
package org.apache.activemq.artemis.jdbc.store.file;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
@ -51,7 +52,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
private static final Logger logger = Logger.getLogger(JDBCJournalImpl.class);
// Sync Delay in ms
public static final int SYNC_DELAY = 5;
private static final int SYNC_DELAY = 5;
private static int USER_VERSION = 1;
@ -112,6 +113,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
started = true;
}
@Override
public void flush() throws Exception {
}
@Override
protected void createSchema() throws SQLException {
createTable(sqlProvider.getCreateJournalTableSQL());
@ -741,4 +746,24 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
return started;
}
private static class JDBCJournalSync extends ActiveMQScheduledComponent {
private final JDBCJournalImpl journal;
JDBCJournalSync(ScheduledExecutorService scheduledExecutorService,
Executor executor,
long checkPeriod,
TimeUnit timeUnit,
JDBCJournalImpl journal) {
super(scheduledExecutorService, executor, checkPeriod, timeUnit, true);
this.journal = journal;
}
@Override
public void run() {
if (journal.isStarted()) {
journal.sync();
}
}
}
}

View File

@ -27,7 +27,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
public class JDBCJournalLoaderCallback implements LoaderCallback {
class JDBCJournalLoaderCallback implements LoaderCallback {
private final List<PreparedTransactionInfo> preparedTransactions;
@ -41,16 +41,16 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
private long maxId = -1;
public JDBCJournalLoaderCallback(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) {
JDBCJournalLoaderCallback(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) {
this.committedRecords = committedRecords;
this.preparedTransactions = preparedTransactions;
this.failureCallback = failureCallback;
}
public synchronized void checkMaxId(long id) {
private synchronized void checkMaxId(long id) {
if (maxId < id) {
maxId = id;
}

View File

@ -27,13 +27,13 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCJournalReaderCallback implements JournalReaderCallback {
class JDBCJournalReaderCallback implements JournalReaderCallback {
private final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<>();
private final LoaderCallback loadManager;
public JDBCJournalReaderCallback(final LoaderCallback loadManager) {
JDBCJournalReaderCallback(final LoaderCallback loadManager) {
this.loadManager = loadManager;
}
@ -126,7 +126,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
// Not needed for JDBC journal impl
}
public void checkPreparedTx() {
void checkPreparedTx() {
for (TransactionHolder transaction : loadTransactions.values()) {
if ((!transaction.prepared && !transaction.committed) || transaction.invalid) {
ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);

View File

@ -32,7 +32,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
public class JDBCJournalRecord {
class JDBCJournalRecord {
/*
Database Table Schema:
@ -49,17 +49,17 @@ public class JDBCJournalRecord {
*/
// Record types taken from Journal Impl
public static final byte ADD_RECORD = 11;
public static final byte UPDATE_RECORD = 12;
public static final byte ADD_RECORD_TX = 13;
public static final byte UPDATE_RECORD_TX = 14;
static final byte ADD_RECORD = 11;
static final byte UPDATE_RECORD = 12;
static final byte ADD_RECORD_TX = 13;
static final byte UPDATE_RECORD_TX = 14;
public static final byte DELETE_RECORD_TX = 15;
public static final byte DELETE_RECORD = 16;
static final byte DELETE_RECORD_TX = 15;
static final byte DELETE_RECORD = 16;
public static final byte PREPARE_RECORD = 17;
public static final byte COMMIT_RECORD = 18;
public static final byte ROLLBACK_RECORD = 19;
static final byte PREPARE_RECORD = 17;
static final byte COMMIT_RECORD = 18;
static final byte ROLLBACK_RECORD = 19;
// Callback and sync operations
private IOCompletion ioCompletion = null;
@ -90,7 +90,7 @@ public class JDBCJournalRecord {
private long seq;
public JDBCJournalRecord(long id, byte recordType, long seq) {
JDBCJournalRecord(long id, byte recordType, long seq) {
this.id = id;
this.recordType = recordType;
@ -110,26 +110,6 @@ public class JDBCJournalRecord {
this.seq = seq;
}
public static String createTableSQL(String tableName) {
return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)";
}
public static String insertRecordsSQL(String tableName) {
return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
}
public static String selectRecordsSQL(String tableName) {
return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq " + "FROM " + tableName + " ORDER BY seq ASC";
}
public static String deleteRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE id = ?";
}
public static String deleteJournalTxRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId=?";
}
public void complete(boolean success) {
if (ioCompletion != null) {
if (success) {
@ -146,7 +126,7 @@ public class JDBCJournalRecord {
}
}
protected void writeRecord(PreparedStatement statement) throws SQLException {
void writeRecord(PreparedStatement statement) throws SQLException {
byte[] recordBytes = new byte[variableSize];
byte[] txDataBytes = new byte[txDataSize];
@ -172,12 +152,12 @@ public class JDBCJournalRecord {
statement.addBatch();
}
protected void writeDeleteRecord(PreparedStatement deleteStatement) throws SQLException {
void writeDeleteRecord(PreparedStatement deleteStatement) throws SQLException {
deleteStatement.setLong(1, id);
deleteStatement.addBatch();
}
public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException {
static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException {
JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2), rs.getLong(11));
record.setCompactCount((byte) rs.getShort(3));
record.setTxId(rs.getLong(4));
@ -190,18 +170,14 @@ public class JDBCJournalRecord {
return record;
}
public IOCompletion getIoCompletion() {
IOCompletion getIoCompletion() {
return ioCompletion;
}
public void setIoCompletion(IOCompletion ioCompletion) {
void setIoCompletion(IOCompletion ioCompletion) {
this.ioCompletion = ioCompletion;
}
public boolean isStoreLineUp() {
return storeLineUp;
}
public void setStoreLineUp(boolean storeLineUp) {
this.storeLineUp = storeLineUp;
}
@ -222,27 +198,23 @@ public class JDBCJournalRecord {
return recordType;
}
public byte getCompactCount() {
byte getCompactCount() {
return compactCount;
}
public void setCompactCount(byte compactCount) {
private void setCompactCount(byte compactCount) {
this.compactCount = compactCount;
}
public long getTxId() {
long getTxId() {
return txId;
}
public void setTxId(long txId) {
void setTxId(long txId) {
this.txId = txId;
}
public int getVariableSize() {
return variableSize;
}
public void setVariableSize(int variableSize) {
private void setVariableSize(int variableSize) {
this.variableSize = variableSize;
}
@ -277,31 +249,19 @@ public class JDBCJournalRecord {
return record;
}
public int getTxCheckNoRecords() {
int getTxCheckNoRecords() {
return txCheckNoRecords;
}
public void setTxCheckNoRecords(int txCheckNoRecords) {
private void setTxCheckNoRecords(int txCheckNoRecords) {
this.txCheckNoRecords = txCheckNoRecords;
}
public void setTxDataSize(int txDataSize) {
private void setTxDataSize(int txDataSize) {
this.txDataSize = txDataSize;
}
public int getTxDataSize() {
return txDataSize;
}
public InputStream getTxData() {
return txData;
}
public void setTxData(InputStream record) {
this.record = record;
}
public void setTxData(EncodingSupport txData) {
void setTxData(EncodingSupport txData) {
this.txDataSize = txData.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(txDataSize);
@ -309,7 +269,7 @@ public class JDBCJournalRecord {
this.txData = new ActiveMQBufferInputStream(encodedBuffer);
}
public void setTxData(byte[] txData) {
void setTxData(byte[] txData) {
if (txData != null) {
this.txDataSize = txData.length;
this.txData = new ByteArrayInputStream(txData);
@ -320,19 +280,19 @@ public class JDBCJournalRecord {
return isUpdate;
}
public byte[] getRecordData() throws IOException {
private byte[] getRecordData() throws IOException {
byte[] data = new byte[variableSize];
record.read(data);
return data;
}
public byte[] getTxDataAsByteArray() throws IOException {
byte[] getTxDataAsByteArray() throws IOException {
byte[] data = new byte[txDataSize];
txData.read(data);
return data;
}
public RecordInfo toRecordInfo() throws IOException {
RecordInfo toRecordInfo() throws IOException {
return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), getCompactCount());
}
@ -340,7 +300,7 @@ public class JDBCJournalRecord {
return isTransactional;
}
public long getSeq() {
long getSeq() {
return seq;
}
}

View File

@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
public class JDBCJournalSync extends ActiveMQScheduledComponent {
private final JDBCJournalImpl journal;
public JDBCJournalSync(ScheduledExecutorService scheduledExecutorService,
Executor executor,
long checkPeriod,
TimeUnit timeUnit,
JDBCJournalImpl journal) {
super(scheduledExecutorService, executor, checkPeriod, timeUnit, true);
this.journal = journal;
}
@Override
public void run() {
if (journal.isStarted()) {
journal.sync();
}
}
}

View File

@ -30,7 +30,7 @@ final class TransactionHolder {
public final long transactionID;
public final List<RecordInfo> recordInfos = new ArrayList<>();
final List<RecordInfo> recordInfos = new ArrayList<>();
public final List<RecordInfo> recordsToDelete = new ArrayList<>();
@ -38,7 +38,7 @@ final class TransactionHolder {
public boolean invalid;
public byte[] extraData;
byte[] extraData;
public boolean committed;
}

View File

@ -32,7 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
@ -53,10 +53,6 @@ public class JDBCSequentialFileFactoryTest {
@Rule
public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
private static String connectionUrl = "jdbc:derby:target/data;create=true";
private static String tableName = "FILES";
private static String className = EmbeddedDriver.class.getCanonicalName();
private JDBCSequentialFileFactory factory;
@ -65,6 +61,8 @@ public class JDBCSequentialFileFactoryTest {
public void setup() throws Exception {
Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
String connectionUrl = "jdbc:derby:target/data;create=true";
String tableName = "FILES";
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName), executor);
factory.start();
}
@ -198,7 +196,7 @@ public class JDBCSequentialFileFactoryTest {
fail(errorMessage);
}
public void assertEmpty(int timeout) throws InterruptedException {
void assertEmpty(int timeout) throws InterruptedException {
countDownLatch.await(timeout, TimeUnit.SECONDS);
assertEquals(countDownLatch.getCount(), 0);
}

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination;
import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.IDGenerator;
public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
@ -73,7 +74,8 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public JMSJournalStorageManagerImpl(final IDGenerator idGenerator,
public JMSJournalStorageManagerImpl(ExecutorFactory ioExecutors,
final IDGenerator idGenerator,
final Configuration config,
final ReplicationManager replicator) {
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO) {
@ -86,7 +88,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
Journal localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1, 0);
if (replicator != null) {
jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator);

View File

@ -1544,16 +1544,13 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
* @throws Exception
*/
private void createJournal() throws Exception {
if (storage == null) {
if (coreConfig.isPersistenceEnabled()) {
storage = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager());
} else {
storage = new NullJMSStorageManagerImpl();
}
if (storage != null) {
storage.stop();
}
if (coreConfig.isPersistenceEnabled()) {
storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager());
} else {
if (storage.isStarted()) {
storage.stop();
}
storage = new NullJMSStorageManagerImpl();
}
storage.start();

View File

@ -237,4 +237,9 @@ public interface Journal extends ActiveMQComponent {
* only be called once the synchronization of the backup and live servers is completed.
*/
void replicationSyncFinished();
/**
* It will make sure there are no more pending operations on the Executors.
* */
void flush() throws Exception;
}

View File

@ -98,6 +98,10 @@ public final class FileWrapperJournal extends JournalBase {
writeRecord(addRecord, sync, callback);
}
@Override
public void flush() throws Exception {
}
/**
* Write the record to the current file.
*/

View File

@ -17,11 +17,13 @@
package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -45,12 +47,14 @@ public class JournalTransaction {
private boolean compacting = false;
private Map<JournalFile, TransactionCallback> callbackList;
private final Map<JournalFile, TransactionCallback> callbackList = Collections.synchronizedMap(new HashMap<JournalFile, TransactionCallback>());
private JournalFile lastFile = null;
private final AtomicInteger counter = new AtomicInteger();
private CountDownLatch firstCallbackLatch;
public JournalTransaction(final long id, final JournalRecordProvider journal) {
this.id = id;
this.journal = journal;
@ -139,9 +143,7 @@ public class JournalTransaction {
pendingFiles.clear();
}
if (callbackList != null) {
callbackList.clear();
}
callbackList.clear();
if (pos != null) {
pos.clear();
@ -156,6 +158,8 @@ public class JournalTransaction {
lastFile = null;
currentCallback = null;
firstCallbackLatch = null;
}
/**
@ -166,9 +170,13 @@ public class JournalTransaction {
data.setNumberOfRecords(getCounter(currentFile));
}
public TransactionCallback getCurrentCallback() {
return currentCallback;
}
public TransactionCallback getCallback(final JournalFile file) throws Exception {
if (callbackList == null) {
callbackList = new HashMap<>();
if (firstCallbackLatch != null && callbackList.isEmpty()) {
firstCallbackLatch.countDown();
}
currentCallback = callbackList.get(file);
@ -178,15 +186,19 @@ public class JournalTransaction {
callbackList.put(file, currentCallback);
}
if (currentCallback.getErrorMessage() != null) {
throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
}
currentCallback.countUp();
return currentCallback;
}
public void checkErrorCondition() throws Exception {
if (currentCallback != null) {
if (currentCallback.getErrorMessage() != null) {
throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
}
}
}
public void addPositive(final JournalFile file, final long id, final int size) {
incCounter(file);
@ -217,7 +229,7 @@ public class JournalTransaction {
public void commit(final JournalFile file) {
JournalCompactor compactor = journal.getCompactor();
if (compacting) {
if (compacting && compactor != null) {
compactor.addCommandCommit(this, file);
} else {
@ -264,7 +276,8 @@ public class JournalTransaction {
}
public void waitCallbacks() throws InterruptedException {
if (callbackList != null) {
waitFirstCallback();
synchronized (callbackList) {
for (TransactionCallback callback : callbackList.values()) {
callback.waitCompletion();
}
@ -275,8 +288,15 @@ public class JournalTransaction {
* Wait completion at the latest file only
*/
public void waitCompletion() throws Exception {
if (currentCallback != null) {
currentCallback.waitCompletion();
waitFirstCallback();
currentCallback.waitCompletion();
}
private void waitFirstCallback() throws InterruptedException {
if (currentCallback == null) {
firstCallbackLatch = new CountDownLatch(1);
firstCallbackLatch.await();
firstCallbackLatch = null;
}
}

View File

@ -143,7 +143,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void compactReadError(JournalFile file);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142012, value = "Couldn''t find tx={0} to merge after compacting",
@Message(id = 142012, value = "Couldn't find tx={0} to merge after compacting",
format = Message.Format.MESSAGE_FORMAT)
void compactMergeError(Long id);
@ -163,12 +163,12 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void uncomittedTxFound(Long id);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142016, value = "Couldn''t stop compactor executor after 120 seconds",
@Message(id = 142016, value = "Could not stop compactor executor after 120 seconds",
format = Message.Format.MESSAGE_FORMAT)
void couldNotStopCompactor();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142017, value = "Couldn''t stop journal executor after 60 seconds",
@Message(id = 142017, value = "Could not stop journal executor after 60 seconds",
format = Message.Format.MESSAGE_FORMAT)
void couldNotStopJournalExecutor();
@ -182,7 +182,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void deletingOrphanedFile(String fileToDelete);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142020, value = "Couldn''t get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT)
@Message(id = 142020, value = "Could not get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT)
void errorClosingFile(String fileToDelete);
@LogMessage(level = Logger.Level.WARN)
@ -241,6 +241,10 @@ public interface ActiveMQJournalLogger extends BasicLogger {
@Message(id = 142034, value = "Exception on submitting write", format = Message.Format.MESSAGE_FORMAT)
void errorSubmittingWrite(@Cause Throwable e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142035, value = "Could not stop journal append executor after 60 seconds", format = Message.Format.MESSAGE_FORMAT)
void couldNotStopJournalAppendExecutor();
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 144000, value = "Failed to delete file {0}", format = Message.Format.MESSAGE_FORMAT)
void errorDeletingFile(Object e);

View File

@ -63,9 +63,8 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
SimpleString defaultAddress) {
return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString());
boolean preAcknowledge) {
return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), null);
}
@Override

View File

@ -23,7 +23,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTrackerCustomizer;

View File

@ -1189,7 +1189,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
public synchronized boolean closeConnectionsForAddress(final String ipAddress) {
public boolean closeConnectionsForAddress(final String ipAddress) {
checkStarted();
clearIO();
@ -1213,7 +1213,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
public synchronized boolean closeConsumerConnectionsForAddress(final String address) {
public boolean closeConsumerConnectionsForAddress(final String address) {
boolean closed = false;
checkStarted();
@ -1251,7 +1251,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
public synchronized boolean closeConnectionsForUser(final String userName) {
public boolean closeConnectionsForUser(final String userName) {
boolean closed = false;
checkStarted();

View File

@ -410,7 +410,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor();
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
@ -446,7 +446,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor();
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
// returns just the first, as it's the first only
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
@ -499,7 +499,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (filter == null) {
return getMessageCount();
} else {
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
int count = 0;
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
@ -895,7 +895,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
ArrayList<CompositeData> c = new ArrayList<>();
Filter filter = FilterImpl.createFilter(filterStr);
queue.flushExecutor();
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
while (iterator.hasNext() && currentPageSize++ < pageSize) {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {

View File

@ -56,7 +56,10 @@ public interface PageSubscription {
LinkedListIterator<PagedReference> iterator();
// To be called when the cursor is closed for good. Most likely when the queue is deleted
LinkedListIterator<PagedReference> iterator(boolean jumpRemoves);
// To be called when the cursor is closed for good. Most likely when the queue is deleted
void destroy() throws Exception;
void scheduleCleanupCheck();

View File

@ -251,7 +251,6 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
recordID = -1;
value.set(0);
added.set(0);
incrementRecords.clear();
}
} finally {

View File

@ -192,7 +192,8 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override
public void reloadPageCompletion(PagePosition position) throws Exception {
// if the current page is complete, we must move it out of the way
if (pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
if (pageStore != null && pageStore.getCurrentPage() != null &&
pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
pageStore.forceAnotherPage();
}
PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null);
@ -351,6 +352,11 @@ final class PageSubscriptionImpl implements PageSubscription {
return new CursorIterator();
}
@Override
public PageIterator iterator(boolean browsing) {
return new CursorIterator(browsing);
}
private PagedReference internalGetNext(final PagePosition pos) {
PagePosition retPos = pos.nextMessage();
@ -1100,6 +1106,8 @@ final class PageSubscriptionImpl implements PageSubscription {
private volatile PagedReference lastRedelivery = null;
private final boolean browsing;
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final java.util.Queue<PagePosition> redeliveries = new LinkedList<>();
@ -1109,7 +1117,13 @@ final class PageSubscriptionImpl implements PageSubscription {
*/
private volatile PagedReference cachedNext;
private CursorIterator(boolean browsing) {
this.browsing = browsing;
}
private CursorIterator() {
this.browsing = false;
}
@Override
@ -1199,7 +1213,7 @@ final class PageSubscriptionImpl implements PageSubscription {
PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
if (info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
if (!browsing && info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
continue;
}
@ -1225,7 +1239,7 @@ final class PageSubscriptionImpl implements PageSubscription {
// nothing
// is being changed. That's why the false is passed as a parameter here
if (info != null && info.isRemoved(message.getPosition())) {
if (!browsing && info != null && info.isRemoved(message.getPosition())) {
valid = false;
}
}
@ -1237,10 +1251,10 @@ final class PageSubscriptionImpl implements PageSubscription {
if (valid) {
match = match(message.getMessage());
if (!match) {
if (!browsing && !match) {
processACK(message.getPosition());
}
} else if (ignored) {
} else if (!browsing && ignored) {
positionIgnored(message.getPosition());
}
} while (!match);

View File

@ -19,11 +19,9 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import javax.transaction.xa.Xid;
import java.io.File;
import java.io.FileInputStream;
import java.security.AccessController;
import java.security.DigestInputStream;
import java.security.InvalidParameterException;
import java.security.MessageDigest;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -34,8 +32,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -103,7 +99,6 @@ import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.IDGenerator;
@ -151,6 +146,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
protected BatchingIDGenerator idGenerator;
protected final ExecutorFactory ioExecutors;
protected final ScheduledExecutorService scheduledExecutorService;
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
@ -168,7 +165,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
final Executor executor;
ExecutorService singleThreadExecutor;
Executor singleThreadExecutor;
private final boolean syncTransactional;
@ -191,18 +188,22 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
public AbstractJournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutorService) {
this(config, executorFactory, scheduledExecutorService, null);
final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory ioExecutors) {
this(config, executorFactory, scheduledExecutorService, ioExecutors, null);
}
public AbstractJournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory ioExecutors,
IOCriticalErrorListener criticalErrorListener) {
this.executorFactory = executorFactory;
this.ioCriticalErrorListener = criticalErrorListener;
this.ioExecutors = ioExecutors;
this.scheduledExecutorService = scheduledExecutorService;
this.config = config;
@ -286,10 +287,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
OperationContextImpl.setContext(context);
}
public Executor getSingleThreadExecutor() {
return singleThreadExecutor;
}
@Override
public OperationContext newSingleThreadContext() {
return newContext(singleThreadExecutor);
@ -1429,12 +1426,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
beforeStart();
singleThreadExecutor = Executors.newSingleThreadExecutor(AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-IO-SingleThread", true, JournalStorageManager.class.getClassLoader());
}
}));
singleThreadExecutor = executorFactory.getExecutor();
bindingsJournal.start();
@ -1490,8 +1482,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
messageJournal.stop();
singleThreadExecutor.shutdown();
journalLoaded = false;
started = false;

View File

@ -25,7 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
@ -36,15 +36,17 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
public JDBCJournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
ExecutorFactory ioExecutorFactory,
ScheduledExecutorService scheduledExecutorService) {
super(config, executorFactory, scheduledExecutorService);
super(config, executorFactory, scheduledExecutorService, ioExecutorFactory);
}
public JDBCJournalStorageManager(final Configuration config,
final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory executorFactory,
final ExecutorFactory ioExecutorFactory,
final IOCriticalErrorListener criticalErrorListener) {
super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
super(config, executorFactory, scheduledExecutorService, ioExecutorFactory, criticalErrorListener);
}
@Override
@ -59,7 +61,7 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
}
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName()), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor());
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName()), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName()), dbConf.getLargeMessageTableName(), executor);
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName()), executor);
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName()), scheduledExecutorService, executorFactory.getExecutor());
@ -101,8 +103,6 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
messageJournal.stop();
largeMessagesFactory.stop();
singleThreadExecutor.shutdown();
journalLoaded = false;
started = false;

View File

@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -85,25 +86,28 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutorService) {
this(config, executorFactory, scheduledExecutorService, null);
final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory ioExecutors) {
this(config, executorFactory, scheduledExecutorService, ioExecutors, null);
}
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) {
this(config, executorFactory, null, null);
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ExecutorFactory ioExecutors) {
this(config, executorFactory, null, ioExecutors, null);
}
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory ioExecutors,
final IOCriticalErrorListener criticalErrorListener) {
super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
super(config, executorFactory, scheduledExecutorService, ioExecutors, criticalErrorListener);
}
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ExecutorFactory ioExecutors,
final IOCriticalErrorListener criticalErrorListener) {
super(config, executorFactory, null, criticalErrorListener);
super(config, executorFactory, null, ioExecutors, criticalErrorListener);
}
@Override
@ -115,7 +119,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1);
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
bindingsJournal = localBindings;
originalBindingsJournal = localBindings;
@ -131,7 +135,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
}
Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO());
Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0);
messageJournal = localMessage;
originalMessageJournal = localMessage;
@ -197,14 +202,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
try {
executor.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
latch.await(30, TimeUnit.SECONDS);
latch.await(30, TimeUnit.SECONDS);
} catch (RejectedExecutionException ignored) {
// that's ok
}
// We cache the variable as the replicator could be changed between here and the time we call stop
// since sendLiveIsStopping may issue a close back from the channel
@ -225,8 +234,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
messageJournal.stop();
singleThreadExecutor.shutdown();
journalLoaded = false;
started = false;

View File

@ -64,6 +64,11 @@ public class ReplicatedJournal implements Journal {
this.replicationManager = replicationManager;
}
@Override
public void flush() throws Exception {
}
/**
* @param id
* @param recordType

View File

@ -345,6 +345,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
ExecutorFactory getExecutorFactory();
ExecutorFactory getIOExecutorFactory();
void setGroupingHandler(GroupingHandler groupingHandler);
GroupingHandler getGroupingHandler();

View File

@ -195,7 +195,7 @@ public interface Queue extends Bindable {
*/
LinkedListIterator<MessageReference> iterator();
LinkedListIterator<MessageReference> totalIterator();
LinkedListIterator<MessageReference> browserIterator();
SimpleString getExpiryAddress();

View File

@ -36,6 +36,10 @@ public interface ServiceRegistry {
void setExecutorService(ExecutorService executorService);
ExecutorService getIOExecutorService();
void setIOExecutorService(ExecutorService ioExecutorService);
ScheduledExecutorService getScheduledExecutorService();
void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService);

View File

@ -38,11 +38,12 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -150,6 +151,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -230,6 +232,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private volatile ExecutorFactory executorFactory;
private volatile ExecutorService ioExecutorPool;
/**
* This is a thread pool for io tasks only.
* We can't use the same global executor to avoid starvations.
*/
private volatile ExecutorFactory ioExecutorFactory;
private final HierarchicalRepository<Set<Role>> securityRepository;
private volatile ResourceManager resourceManager;
@ -859,17 +869,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
if (threadPool != null && !threadPoolSupplied) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.timedOutStoppingThreadpool(threadPool);
for (Runnable r : threadPool.shutdownNow()) {
logger.debug("Cancelled the execution of " + r);
}
}
} catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(threadPool.getClass().getName());
}
shutdownPool(threadPool);
}
if (ioExecutorPool != null) {
shutdownPool(ioExecutorPool);
}
if (!threadPoolSupplied)
@ -950,6 +954,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
private void shutdownPool(ExecutorService executorService) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.timedOutStoppingThreadpool(threadPool);
for (Runnable r : executorService.shutdownNow()) {
logger.debug("Cancelled the execution of " + r);
}
}
} catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(threadPool.getClass().getName());
}
}
public boolean checkLiveIsNotColocated(String nodeId) {
if (parentServer == null) {
return true;
@ -1618,6 +1636,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return executorFactory;
}
@Override
public ExecutorFactory getIOExecutorFactory() {
return ioExecutorFactory;
}
@Override
public void setGroupingHandler(final GroupingHandler groupingHandler) {
if (this.groupingHandler != null && managementService != null) {
@ -1752,10 +1775,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, shutdownOnCriticalIO);
return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);
} else {
// Default to File Based Storage Manager, (Legacy default configuration).
return new JournalStorageManager(configuration, executorFactory, scheduledPool, shutdownOnCriticalIO);
return new JournalStorageManager(configuration, executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO);
}
}
return new NullStorageManager();
@ -1805,10 +1828,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return new ActiveMQThreadFactory("ActiveMQ-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader());
}
});
if (configuration.getThreadPoolMaxSize() == -1) {
threadPool = Executors.newCachedThreadPool(tFactory);
threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory);
} else {
threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory);
threadPool = new ActiveMQThreadPoolExecutor(0, configuration.getThreadPoolMaxSize(), 60L, TimeUnit.SECONDS, tFactory);
}
} else {
threadPool = serviceRegistry.getExecutorService();
@ -1816,6 +1840,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
this.executorFactory = new OrderedExecutorFactory(threadPool);
if (serviceRegistry.getIOExecutorService() != null) {
this.ioExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getIOExecutorService());
} else {
ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-IO-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader());
}
});
this.ioExecutorPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), tFactory);
this.ioExecutorFactory = new OrderedExecutorFactory(ioExecutorPool);
}
/* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry. If so we use this
* Scheduled ExecutorService otherwise we create a new one.
*/

View File

@ -867,8 +867,8 @@ public class QueueImpl implements Queue {
}
@Override
public TotalQueueIterator totalIterator() {
return new TotalQueueIterator();
public QueueBrowserIterator browserIterator() {
return new QueueBrowserIterator();
}
@Override
@ -2863,17 +2863,23 @@ public class QueueImpl implements Queue {
//Readonly (no remove) iterator over the messages in the queue, in order of
//paging store, intermediateMessageReferences and MessageReferences
private class TotalQueueIterator implements LinkedListIterator<MessageReference> {
private class QueueBrowserIterator implements LinkedListIterator<MessageReference> {
LinkedListIterator<PagedReference> pageIter = null;
LinkedListIterator<PagedReference> pagingIterator = null;
LinkedListIterator<MessageReference> messagesIterator = null;
private LinkedListIterator<PagedReference> getPagingIterator() {
if (pagingIterator == null) {
pagingIterator = pageSubscription.iterator(true);
}
return pagingIterator;
}
Iterator lastIterator = null;
private TotalQueueIterator() {
if (pageSubscription != null) {
pageIter = pageSubscription.iterator();
}
MessageReference cachedNext = null;
private QueueBrowserIterator() {
messagesIterator = new SynchronizedIterator(messageReferences.iterator());
}
@ -2883,9 +2889,9 @@ public class QueueImpl implements Queue {
lastIterator = messagesIterator;
return true;
}
if (pageIter != null) {
if (pageIter.hasNext()) {
lastIterator = pageIter;
if (getPagingIterator() != null) {
if (getPagingIterator().hasNext()) {
lastIterator = getPagingIterator();
return true;
}
}
@ -2893,16 +2899,37 @@ public class QueueImpl implements Queue {
return false;
}
@Override
public MessageReference next() {
if (messagesIterator != null && messagesIterator.hasNext()) {
MessageReference msg = messagesIterator.next();
return msg;
if (cachedNext != null) {
try {
return cachedNext;
} finally {
cachedNext = null;
}
}
if (pageIter != null) {
if (pageIter.hasNext()) {
lastIterator = pageIter;
return pageIter.next();
while (true) {
if (messagesIterator != null && messagesIterator.hasNext()) {
MessageReference msg = messagesIterator.next();
if (msg.isPaged()) {
System.out.println("** Rejecting because it's paged " + msg.getMessage());
continue;
}
// System.out.println("** Returning because it's not paged " + msg.getMessage());
return msg;
} else {
break;
}
}
if (getPagingIterator() != null) {
if (getPagingIterator().hasNext()) {
lastIterator = getPagingIterator();
MessageReference ref = getPagingIterator().next();
return ref;
}
}
@ -2922,8 +2949,8 @@ public class QueueImpl implements Queue {
@Override
public void close() {
if (pageIter != null) {
pageIter.close();
if (getPagingIterator() != null) {
getPagingIterator().close();
}
if (messagesIterator != null) {
messagesIterator.close();

View File

@ -165,7 +165,7 @@ public class ScaleDownHandler {
for (Queue loopQueue : queues) {
logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue);
try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.totalIterator()) {
try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.browserIterator()) {
while (messagesIterator.hasNext()) {
MessageReference messageReference = messagesIterator.next();
@ -249,7 +249,7 @@ public class ScaleDownHandler {
for (Queue queue : queues) {
// using auto-closeable
try (LinkedListIterator<MessageReference> messagesIterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> messagesIterator = queue.browserIterator()) {
// loop through every message of this queue
while (messagesIterator.hasNext()) {
MessageReference messageRef = messagesIterator.next();

View File

@ -206,7 +206,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.creationTime = System.currentTimeMillis();
if (browseOnly) {
browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
} else {
messageQueue.addConsumer(this);
}

View File

@ -41,6 +41,8 @@ public class ServiceRegistryImpl implements ServiceRegistry {
private ExecutorService executorService;
private ExecutorService ioExecutorService;
private ScheduledExecutorService scheduledExecutorService;
/* We are using a List rather than HashMap here as ActiveMQ Artemis allows multiple instances of the same class to be added
@ -162,6 +164,16 @@ public class ServiceRegistryImpl implements ServiceRegistry {
return transformer;
}
@Override
public ExecutorService getIOExecutorService() {
return ioExecutorService;
}
@Override
public void setIOExecutorService(ExecutorService ioExecutorService) {
this.ioExecutorService = ioExecutorService;
}
@Override
public void addBridgeTransformer(String name, Transformer transformer) {
bridgeTransformers.put(name, transformer);

View File

@ -1212,7 +1212,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public LinkedListIterator<MessageReference> totalIterator() {
public LinkedListIterator<MessageReference> browserIterator() {
return null;
}

View File

@ -129,7 +129,7 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
@ -467,7 +467,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
public void destroyTables(List<String> tableNames) throws Exception {
Driver driver = JDBCUtils.getDriver(getJDBCClassName());
Driver driver = getDriver(getJDBCClassName());
Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);
Statement statement = connection.createStatement();
try {
@ -490,6 +490,30 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
private Driver getDriver(String className) throws Exception {
try {
Driver driver = (Driver) Class.forName(className).newInstance();
// Shutdown the derby if using the derby embedded driver.
if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception e) {
}
}
});
}
return driver;
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not find class: " + className);
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate driver class: ", e);
}
}
protected Map<String, Object> generateInVMParams(final int node) {
Map<String, Object> params = new HashMap<>();

View File

@ -1,160 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class ClosingConnectionTest extends ActiveMQTestBase {
public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
private ServerLocator locator;
private ActiveMQServer server;
private static MBeanServer mBeanServer;
private static boolean readyToKill = false;
protected boolean isNetty() {
return true;
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
mBeanServer = MBeanServerFactory.createMBeanServer();
server = newActiveMQServer();
server.getConfiguration().setJournalType(JournalType.NIO);
server.getConfiguration().setJMXManagementEnabled(true);
server.start();
waitForServerToStart(server);
locator = createFactory(isNetty());
readyToKill = false;
}
public static void killConnection() throws InterruptedException {
if (readyToKill) {
// We have to kill the connection in a new thread otherwise Netty won't interrupt the current thread
Thread closeConnectionThread = new Thread(new Runnable() {
@Override
public void run() {
try {
ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mBeanServer);
serverControl.closeConnectionsForUser("guest");
readyToKill = false;
} catch (Exception e) {
e.printStackTrace();
}
}
});
closeConnectionThread.start();
try {
/* We want to simulate a long-running remoting thread here. If closing the connection in the closeConnectionThread
* interrupts this thread then it will cause sleep() to throw and InterruptedException. Therefore we catch
* the InterruptedException and re-interrupt the current thread so the interrupt will be passed properly
* back to the caller. It's a bit of a hack, but I couldn't find any other way to simulate it.
*/
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/*
* Test for https://bugzilla.redhat.com/show_bug.cgi?id=1193085
* */
@Test
@BMRules(rules = {@BMRule(
name = "rule to kill connection",
targetClass = "org.apache.activemq.artemis.core.io.nio.NIOSequentialFile",
targetMethod = "open(int, boolean)",
targetLocation = "AT INVOKE java.nio.channels.FileChannel.size()",
action = "org.apache.activemq.artemis.tests.extras.byteman.ClosingConnectionTest.killConnection();")})
public void testKillConnection() throws Exception {
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession("guest", null, false, true, true, false, 0);
session.createQueue(ADDRESS, ADDRESS, null, true);
ClientProducer producer = session.createProducer(ADDRESS);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
for (int i = 0; i < 200; i++) {
producer.send(message);
}
assertTrue(server.locateQueue(ADDRESS).getPageSubscription().getPagingStore().isPaging());
readyToKill = true;
try {
for (int i = 0; i < 10; i++) {
producer.send(message);
}
fail("Sending message here should result in failure.");
} catch (Exception e) {
IntegrationTestLogger.LOGGER.info("Caught exception: " + e.getMessage());
}
Thread.sleep(1000);
assertTrue(server.isStarted());
session.close();
}
private ActiveMQServer newActiveMQServer() throws Exception {
ActiveMQServer server = createServer(true, createDefaultConfig(isNetty()));
server.setMBeanServer(mBeanServer);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
return server;
}
}

View File

@ -50,7 +50,7 @@ public class JMSBridgeReconnectionTest extends BridgeTestBase {
targetClass = "org.apache.activemq.artemis.core.client.impl.ClientProducerImpl",
targetMethod = "sendRegularMessage",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($1,$2,$3);")})
action = "org.apache.activemq.artemis.tests.extras.byteman.JMSBridgeReconnectionTest.pause2($2,$3,$4);")})
public void performCrashDestinationStopBridge() throws Exception {
activeMQServer = jmsServer1;
ConnectionFactoryFactory factInUse0 = cff0;

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
@ -92,10 +93,13 @@ public class PagingLeakTest extends ActiveMQTestBase {
positions.clear();
timeout = System.currentTimeMillis() + 5000;
while (pagePosInstances.get() != 0 && timeout > System.currentTimeMillis()) {
forceGC();
}
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
forceGC();
return pagePosInstances.get() == 0;
}
}, 5000, 100);
// This is just to validate the rules are correctly applied on byteman
assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 0, pagePosInstances.get());
@ -110,7 +114,7 @@ public class PagingLeakTest extends ActiveMQTestBase {
server.start();
AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(10 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server.getAddressSettingsRepository().addMatch("#", settings);

View File

@ -104,12 +104,12 @@ public class ProducerTest extends ActiveMQTestBase {
ClientProducer producer = session.createProducer();
for (int i = 0; i < 62; i++) {
if (i == 61) {
if (i == 30) {
// the point where the send would block
latch.countDown();
}
ClientMessage msg = session.createMessage(false);
msg.getBodyBuffer().writeBytes(new byte[1024]);
msg.getBodyBuffer().writeBytes(new byte[2048]);
producer.send(QUEUE, msg);
}
} catch (Exception e) {
@ -119,7 +119,7 @@ public class ProducerTest extends ActiveMQTestBase {
};
t.start();
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(latch.await(10, TimeUnit.SECONDS));
session.close();
t.join(5000);

View File

@ -93,7 +93,7 @@ public class BackupSyncJournalTest extends FailoverTestBase {
@Test
public void testReserveFileIdValuesOnBackup() throws Exception {
final int totalRounds = 50;
final int totalRounds = 5;
createProducerSendSomeMessages();
JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
for (int i = 0; i < totalRounds; i++) {

View File

@ -713,6 +713,8 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.testCompact();
}
journal.flush();
stopJournal();
createJournal();
startJournal();
@ -730,7 +732,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
@Test
public void testCompactAddAndUpdateFollowedByADelete() throws Exception {
setup(2, 60 * 1024, false);
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
@ -779,7 +780,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
createJournal();
startJournal();
loadAndCheck();
}
@Test
@ -1610,8 +1610,9 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
}
@Test
public void testStressDeletesNoSync() throws Exception {
public void testStressDeletesNoSync() throws Throwable {
Configuration config = createBasicConfig().setJournalFileSize(100 * 1024).setJournalSyncNonTransactional(false).setJournalSyncTransactional(false).setJournalCompactMinFiles(0).setJournalCompactPercentage(0);
final AtomicInteger errors = new AtomicInteger(0);
@ -1622,121 +1623,147 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService ioexecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
OrderedExecutorFactory iofactory = new OrderedExecutorFactory(ioexecutor);
final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final JournalStorageManager storage = new JournalStorageManager(config, factory);
final JournalStorageManager storage = new JournalStorageManager(config, factory, iofactory);
storage.start();
storage.loadInternalOnly();
((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false);
final LinkedList<Long> survivingMsgs = new LinkedList<>();
try {
storage.loadInternalOnly();
Runnable producerRunnable = new Runnable() {
@Override
public void run() {
try {
while (running.get()) {
final long[] values = new long[100];
long tx = seqGenerator.incrementAndGet();
((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false);
final LinkedList<Long> survivingMsgs = new LinkedList<>();
OperationContextImpl ctx = new OperationContextImpl(executor);
storage.setContext(ctx);
Runnable producerRunnable = new Runnable() {
@Override
public void run() {
try {
while (running.get()) {
final long[] values = new long[100];
long tx = seqGenerator.incrementAndGet();
for (int i = 0; i < 100; i++) {
long id = seqGenerator.incrementAndGet();
values[i] = id;
OperationContextImpl ctx = new OperationContextImpl(executor);
storage.setContext(ctx);
ServerMessageImpl message = new ServerMessageImpl(id, 100);
for (int i = 0; i < 100; i++) {
long id = seqGenerator.incrementAndGet();
values[i] = id;
message.getBodyBuffer().writeBytes(new byte[1024]);
ServerMessageImpl message = new ServerMessageImpl(id, 100);
storage.storeMessageTransactional(tx, message);
}
ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100);
message.getBodyBuffer().writeBytes(new byte[1024]);
survivingMsgs.add(message.getMessageID());
// This one will stay here forever
storage.storeMessage(message);
storage.commit(tx);
ctx.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
storage.storeMessageTransactional(tx, message);
}
ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100);
@Override
public void done() {
deleteExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (long messageID : values) {
storage.deleteMessage(messageID);
survivingMsgs.add(message.getMessageID());
// This one will stay here forever
storage.storeMessage(message);
storage.commit(tx);
ctx.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
deleteExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (long messageID : values) {
storage.deleteMessage(messageID);
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
});
}
});
}
});
}
});
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
};
Runnable compressRunnable = new Runnable() {
@Override
public void run() {
try {
while (running.get()) {
Thread.sleep(500);
System.out.println("Compacting");
((JournalImpl) storage.getMessageJournal()).testCompact();
((JournalImpl) storage.getMessageJournal()).checkReclaimStatus();
Runnable compressRunnable = new Runnable() {
@Override
public void run() {
try {
while (running.get()) {
Thread.sleep(500);
System.out.println("Compacting");
((JournalImpl) storage.getMessageJournal()).testCompact();
((JournalImpl) storage.getMessageJournal()).checkReclaimStatus();
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
};
Thread producerThread = new Thread(producerRunnable);
producerThread.start();
Thread compactorThread = new Thread(compressRunnable);
compactorThread.start();
Thread.sleep(1000);
running.set(false);
producerThread.join();
compactorThread.join();
deleteExecutor.shutdown();
assertTrue("delete executor failted to terminate", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
storage.stop();
executor.shutdown();
assertTrue("executor failed to terminate", executor.awaitTermination(30, TimeUnit.SECONDS));
ioexecutor.shutdown();
assertTrue("ioexecutor failed to terminate", ioexecutor.awaitTermination(30, TimeUnit.SECONDS));
} catch (Throwable e) {
e.printStackTrace();
throw e;
} finally {
try {
storage.stop();
} catch (Exception e) {
e.printStackTrace();
}
};
Thread producerThread = new Thread(producerRunnable);
producerThread.start();
executor.shutdownNow();
deleteExecutor.shutdownNow();
ioexecutor.shutdownNow();
}
Thread compactorThread = new Thread(compressRunnable);
compactorThread.start();
Thread.sleep(1000);
running.set(false);
producerThread.join();
compactorThread.join();
storage.stop();
executor.shutdown();
assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS));
deleteExecutor.shutdown();
assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
}
@Override

View File

@ -144,18 +144,21 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
JournalImpl journal = ValidateTransactionHealthTest.createJournal(type, journalDir);
journal.start();
Loader loadTest = new Loader(numberOfRecords);
journal.load(loadTest);
Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds);
Assert.assertEquals(0, loadTest.numberOfPreparedTransactions);
Assert.assertEquals(0, loadTest.numberOfUpdates);
Assert.assertEquals(0, loadTest.numberOfDeletes);
try {
Loader loadTest = new Loader(numberOfRecords);
journal.load(loadTest);
Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds);
Assert.assertEquals(0, loadTest.numberOfPreparedTransactions);
Assert.assertEquals(0, loadTest.numberOfUpdates);
Assert.assertEquals(0, loadTest.numberOfDeletes);
journal.stop();
if (loadTest.ex != null) {
throw loadTest.ex;
if (loadTest.ex != null) {
throw loadTest.ex;
}
} finally {
journal.stop();
}
}
// Inner classes -------------------------------------------------
@ -311,6 +314,8 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
throw e;
}
journal.flush();
return journal;
}

View File

@ -58,6 +58,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -265,12 +266,18 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
ServerLocator receiveLocator = createInVMNonHALocator();
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
ClientConsumer consumer = receiveClientSession.createConsumer(name);
final ClientConsumer consumer = receiveClientSession.createConsumer(name);
Assert.assertFalse(consumer.isClosed());
checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
serverControl.destroyQueue(name.toString(), true);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return consumer.isClosed();
}
}, 1000, 100);
Assert.assertTrue(consumer.isClosed());
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));

View File

@ -308,7 +308,7 @@ public class PagingSendTest extends ActiveMQTestBase {
* duplicates that may have happened before this point).
*/
public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception {
LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
LinkedListIterator<MessageReference> pageIterator = queue.browserIterator();
Set<String> messageOrderSet = new HashSet<>();
@ -344,7 +344,7 @@ public class PagingSendTest extends ActiveMQTestBase {
* duplicates that may have happened before this point).
*/
protected int processCountThroughIterator(Queue queue) throws Exception {
LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
LinkedListIterator<MessageReference> pageIterator = queue.browserIterator();
int count = 0;
while (pageIterator.hasNext()) {

View File

@ -3335,7 +3335,7 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage message = null;
for (int i = 0; i < numberOfMessages; i++) {
byte[] body = new byte[1024];
byte[] body = new byte[2048];
message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body);
@ -3360,7 +3360,7 @@ public class PagingTest extends ActiveMQTestBase {
Assert.assertEquals(0, server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize());
for (int i = 0; i < numberOfMessages; i++) {
byte[] body = new byte[1024];
byte[] body = new byte[2048];
message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body);
@ -3385,7 +3385,7 @@ public class PagingTest extends ActiveMQTestBase {
producer = session.createProducer(PagingTest.ADDRESS);
for (int i = 0; i < numberOfMessages; i++) {
byte[] body = new byte[1024];
byte[] body = new byte[2048];
message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body);
@ -3841,7 +3841,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalFileSize(10 * 1024 * 1024);
server = createServer(true, config, 512 * 1024, 1024 * 1024);
server = createServer(true, config, 100 * 1024, 1024 * 1024 / 2);
server.start();
@ -4745,7 +4745,7 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage message = session.createMessage(true);
int biggerMessageSize = 1024;
int biggerMessageSize = 2048;
byte[] body = new byte[biggerMessageSize];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= biggerMessageSize; j++) {
@ -4817,7 +4817,7 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage message = session.createMessage(true);
int biggerMessageSize = 1024;
int biggerMessageSize = 2048;
byte[] body = new byte[biggerMessageSize];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= biggerMessageSize; j++) {

View File

@ -91,7 +91,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
@Override
protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
return new JournalStorageManager(configuration, execFactory) {
return new JournalStorageManager(configuration, execFactory, execFactory) {
@Override
public void deleteMessage(final long messageID) throws Exception {
deletedMessage.add(messageID);

View File

@ -65,7 +65,7 @@ public class RestartSMTest extends ActiveMQTestBase {
PostOffice postOffice = new FakePostOffice();
final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory);
final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory, execFactory);
try {

View File

@ -137,7 +137,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @param configuration
*/
protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory);
JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory, execFactory);
addActiveMQComponent(jsm);
return jsm;
}
@ -146,7 +146,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @param configuration
*/
protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) {
JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, scheduledExecutorService);
JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, execFactory, scheduledExecutorService);
addActiveMQComponent(jsm);
return jsm;
}
@ -155,7 +155,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @throws Exception
*/
protected void createJMSStorage() throws Exception {
jmsJournal = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null);
jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null);
addActiveMQComponent(jmsJournal);
jmsJournal.start();
jmsJournal.load();

View File

@ -435,7 +435,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
* @throws Exception
*/
private JournalStorageManager getStorage() throws Exception {
return new JournalStorageManager(createDefaultInVMConfig(), factory);
return new JournalStorageManager(createDefaultInVMConfig(), factory, factory);
}
/**
@ -650,6 +650,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
}
@Override
public void flush() throws Exception {
}
@Override
public void appendCommitRecord(final long txID, final boolean sync) throws Exception {

View File

@ -44,6 +44,7 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase {
public void setup() throws Exception {
serviceRegistry = new ServiceRegistryImpl();
serviceRegistry.setExecutorService(Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
serviceRegistry.setIOExecutorService(Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
serviceRegistry.setScheduledExecutorService(Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
server = new ActiveMQServerImpl(null, null, null, null, serviceRegistry);
server.start();
@ -58,6 +59,7 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase {
}
serviceRegistry.getExecutorService().shutdown();
serviceRegistry.getScheduledExecutorService().shutdown();
serviceRegistry.getIOExecutorService().shutdown();
super.tearDown();
}

View File

@ -434,9 +434,6 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals(0, records.size());
Assert.assertEquals(0, transactions.size());
Assert.assertEquals(2, factory.listFiles("tt").size());
}
@Test
@ -532,6 +529,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.appendCommitRecord(1L, false);
journalImpl.debugWait();
System.out.println("Files = " + factory.listFiles("tt"));
SequentialFile file = factory.createSequentialFile("tt-1.tt");
@ -598,6 +597,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.appendCommitRecord(2L, false);
journalImpl.debugWait();
SequentialFile file = factory.createSequentialFile("tt-1.tt");
file.open();
@ -697,6 +698,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.appendCommitRecord(1L, false);
journalImpl.debugWait();
SequentialFile file = factory.createSequentialFile("tt-1.tt");
file.open();
@ -936,11 +939,9 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.forceMoveNextFile();
// Reclaiming should still be able to reclaim a file if a transaction was
// ignored
// Reclaiming should still be able to reclaim a file if a transaction was ignored
journalImpl.checkReclaimStatus();
Assert.assertEquals(2, factory.listFiles("tt").size());
journalImpl.flush();
}
@ -1109,7 +1110,16 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
}
@Test
public void testReclaimingAfterConcurrentAddsAndDeletes() throws Exception {
public void testReclaimingAfterConcurrentAddsAndDeletesTx() throws Exception {
testReclaimingAfterConcurrentAddsAndDeletes(true);
}
@Test
public void testReclaimingAfterConcurrentAddsAndDeletesNonTx() throws Exception {
testReclaimingAfterConcurrentAddsAndDeletes(false);
}
public void testReclaimingAfterConcurrentAddsAndDeletes(final boolean transactional) throws Exception {
final int JOURNAL_SIZE = 10 * 1024;
setupAndLoadJournal(JOURNAL_SIZE, 1);
@ -1131,8 +1141,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
latchReady.countDown();
ActiveMQTestBase.waitForLatch(latchStart);
for (int i = 0; i < NUMBER_OF_ELEMENTS; i++) {
journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1));
journalImpl.appendCommitRecord(i, false);
if (transactional) {
journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1));
journalImpl.appendCommitRecord(i, false);
} else {
journalImpl.appendAddRecord(i, (byte) 1, new SimpleEncoding(50, (byte) 1), false);
}
queueDelete.offer(i);
}
finishedOK.incrementAndGet();
@ -1153,7 +1169,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
if (toDelete == null) {
break;
}
journalImpl.appendDeleteRecord(toDelete, false);
if (transactional) {
journalImpl.appendDeleteRecordTransactional(toDelete, toDelete, new SimpleEncoding(50, (byte) 1));
journalImpl.appendCommitRecord(i, false);
} else {
journalImpl.appendDeleteRecord(toDelete, false);
}
}
finishedOK.incrementAndGet();
} catch (Exception e) {

View File

@ -81,6 +81,8 @@ public class JournalAsyncTest extends ActiveMQTestBase {
journalImpl.appendAddRecordTransactional(1L, i, (byte) 1, new SimpleEncoding(1, (byte) 0));
}
journalImpl.debugWait();
latch.countDown();
factory.setHoldCallbacks(false, null);
if (isCommit) {
@ -115,8 +117,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
}
}
// If a callback error already arrived, we should just throw the exception
// right away
// If a callback error already arrived, we should just throw the exception right away
@Test
public void testPreviousError() throws Exception {
final int JOURNAL_SIZE = 20000;
@ -128,6 +129,8 @@ public class JournalAsyncTest extends ActiveMQTestBase {
journalImpl.appendAddRecordTransactional(1L, 1, (byte) 1, new SimpleEncoding(1, (byte) 0));
journalImpl.debugWait();
factory.flushAllCallbacks();
factory.setGenerateErrors(false);
@ -135,11 +138,11 @@ public class JournalAsyncTest extends ActiveMQTestBase {
try {
journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0));
Assert.fail("Exception expected"); // An exception already happened in one
// of the elements on this transaction.
// We can't accept any more elements on
// the transaction
Assert.fail("Exception expected");
// An exception already happened in one of the elements on this transaction.
// We can't accept any more elements on the transaction
} catch (Exception ignored) {
}
}

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
@ -61,12 +62,6 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
} catch (IllegalStateException e) {
// OK
}
try {
stopJournal();
Assert.fail("Should throw exception");
} catch (IllegalStateException e) {
// OK
}
startJournal();
try {
startJournal();
@ -439,7 +434,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
/**
* Use: calculateNumberOfFiles (fileSize, numberOfRecords, recordSize, numberOfRecords2, recordSize2, , ...., numberOfRecordsN, recordSizeN);
*/
private int calculateNumberOfFiles(final int fileSize, final int alignment, final int... record) throws Exception {
private int calculateNumberOfFiles(TestableJournal journal, final int fileSize, final int alignment, final int... record) throws Exception {
if (journal != null) {
journal.flush();
}
int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);
int currentPosition = headerSize;
int totalFiles = 0;
@ -489,7 +487,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
add(i);
}
int numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 91, JournalImpl.SIZE_ADD_RECORD + recordLength);
int numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 91, JournalImpl.SIZE_ADD_RECORD + recordLength);
Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
@ -512,7 +510,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
add(i);
}
numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 95, JournalImpl.SIZE_ADD_RECORD + recordLength);
numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 95, JournalImpl.SIZE_ADD_RECORD + recordLength);
Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
@ -533,7 +531,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
add(i);
}
numberOfFiles = calculateNumberOfFiles(10 * 1024, journal.getAlignment(), 200, JournalImpl.SIZE_ADD_RECORD + recordLength);
numberOfFiles = calculateNumberOfFiles(journal, 10 * 1024, journal.getAlignment(), 200, JournalImpl.SIZE_ADD_RECORD + recordLength);
Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
@ -646,14 +644,14 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testCalculations() throws Exception {
Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 1, 1, 10, 2, 20));
Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 512, 1, 1));
Assert.assertEquals(0, calculateNumberOfFiles(10 * 1024, 512, 19, 10));
Assert.assertEquals(1, calculateNumberOfFiles(10 * 1024, 512, 20, 10));
Assert.assertEquals(0, calculateNumberOfFiles(3000, 500, 2, 1000, 1, 500));
Assert.assertEquals(1, calculateNumberOfFiles(3000, 500, 2, 1000, 1, 1000));
Assert.assertEquals(9, calculateNumberOfFiles(10240, 1, 90, 1038, 45, 10));
Assert.assertEquals(11, calculateNumberOfFiles(10 * 1024, 512, 60, 14 + 1024, 30, 14));
Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 1, 1, 10, 2, 20));
Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 512, 1, 1));
Assert.assertEquals(0, calculateNumberOfFiles(journal, 10 * 1024, 512, 19, 10));
Assert.assertEquals(1, calculateNumberOfFiles(journal, 10 * 1024, 512, 20, 10));
Assert.assertEquals(0, calculateNumberOfFiles(journal, 3000, 500, 2, 1000, 1, 500));
Assert.assertEquals(1, calculateNumberOfFiles(journal, 3000, 500, 2, 1000, 1, 1000));
Assert.assertEquals(9, calculateNumberOfFiles(journal, 10240, 1, 90, 1038, 45, 10));
Assert.assertEquals(11, calculateNumberOfFiles(journal, 10 * 1024, 512, 60, 14 + 1024, 30, 14));
}
@Test
@ -862,13 +860,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
addTx(1, i);
}
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files2 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength) + 2, files2.size());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files2.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@ -879,13 +877,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
// Make sure nothing reclaimed
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files3 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 100, recordLength) + 2, files3.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@ -898,13 +896,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
updateTx(1, i);
}
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@ -915,7 +913,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
// Make sure nothing reclaimed
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
@ -934,14 +932,14 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
deleteTx(1, i);
}
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files7 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files7.size());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files7.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@ -950,13 +948,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
checkAndReclaimFiles();
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
List<String> files8 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files8.size());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX) + 2, files8.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@ -977,13 +975,13 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
add(i);
}
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(10, journal.getIDMapSize());
List<String> files9 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength) + 2, files9.size());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX, 1, JournalImpl.SIZE_COMMIT_RECORD, 10, JournalImpl.SIZE_ADD_RECORD + recordLength) + 2, files9.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
for (String file : files1) {
@ -1458,7 +1456,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(3, files2.size());
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@ -1467,10 +1465,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
List<String> files3 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@ -1478,10 +1476,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_COMMIT_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@ -1549,10 +1547,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
rollback(1); // in file 1
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@ -1560,10 +1558,10 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
List<String> files4 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength, 1, JournalImpl.SIZE_ROLLBACK_RECORD + 1, 1, JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
@ -1669,7 +1667,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
Assert.assertEquals(3, files2.size());
Assert.assertEquals(calculateNumberOfFiles(fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
Assert.assertEquals(calculateNumberOfFiles(journal, fileSize, journal.getAlignment(), 2, recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());

View File

@ -70,7 +70,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
factory = new OrderedExecutorFactory(executor);
}
@ -92,7 +92,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
journal = new JournalStorageManager(configuration, factory);
journal = new JournalStorageManager(configuration, factory, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@ -112,7 +112,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal.stop();
journal = new JournalStorageManager(configuration, factory);
journal = new JournalStorageManager(configuration, factory, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@ -135,7 +135,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
mapDups.clear();
journal = new JournalStorageManager(configuration, factory);
journal = new JournalStorageManager(configuration, factory, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@ -146,6 +146,8 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
values = mapDups.get(ADDRESS);
Assert.assertEquals(10, values.size());
scheduledThreadPool.shutdown();
} finally {
if (journal != null) {
try {

View File

@ -604,7 +604,7 @@ public class FakeQueue implements Queue {
}
@Override
public LinkedListIterator<MessageReference> totalIterator() {
public LinkedListIterator<MessageReference> browserIterator() {
// TODO Auto-generated method stub
return null;
}

View File

@ -1274,7 +1274,7 @@ public class QueueImplTest extends ActiveMQTestBase {
locator.close();
Queue queue = ((LocalQueueBinding) server.getPostOffice().getBinding(new SimpleString(MY_QUEUE))).getQueue();
LinkedListIterator<MessageReference> totalIterator = queue.totalIterator();
LinkedListIterator<MessageReference> totalIterator = queue.browserIterator();
try {
int i = 0;