diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 0e67dbc572..7d30c4bf06 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -48,7 +49,6 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.jboss.logging.Logger; public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @@ -74,6 +74,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private boolean started; + private AtomicBoolean failed = new AtomicBoolean(false); + private JDBCJournalSync syncTimer; private final Executor completeExecutor; @@ -161,8 +163,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } public synchronized int sync() { - if (!started) - return 0; List recordRef; synchronized (records) { @@ -173,6 +173,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { records.clear(); } + if (!started || failed.get()) { + executeCallbacks(recordRef, false); + return 0; + } + + // We keep a list of deleted records and committed tx (used for cleaning up old transaction data). List deletedRecords = new ArrayList<>(); List committedTransactions = new ArrayList<>(); @@ -183,6 +189,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { connection.setAutoCommit(false); for (JDBCJournalRecord record : recordRef) { + + if (logger.isTraceEnabled()) { + logger.trace("sync::preparing JDBC statment for " + record); + } + + + switch (record.getRecordType()) { case JDBCJournalRecord.DELETE_RECORD: // Standard SQL Delete Record, Non transactional delete @@ -217,6 +230,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { deleteJournalTxRecords.executeBatch(); connection.commit(); + if (logger.isTraceEnabled()) { + logger.trace("JDBC commit worked"); + } cleanupTxRecords(deletedRecords, committedTransactions); executeCallbacks(recordRef, true); @@ -224,14 +240,38 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { return recordRef.size(); } catch (Exception e) { - criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null); - started = false; - executeCallbacks(recordRef, false); - performRollback(recordRef); + handleException(recordRef, e); return 0; } } + /** public for tests only, not through API */ + public void handleException(List recordRef, Throwable e) { + logger.warn(e.getMessage(), e); + failed.set(true); + criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null); + + if (logger.isTraceEnabled()) { + logger.trace("Rolling back Transaction, just in case"); + } + + try { + connection.rollback(); + } catch (Throwable rollback) { + logger.warn(rollback); + } + + try { + connection.close(); + } catch (Throwable rollback) { + logger.warn(rollback); + } + + if (recordRef != null) { + executeCallbacks(recordRef, false); + } + } + /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ private synchronized void cleanupTxRecords(List deletedRecords, List committedTx) throws SQLException { @@ -263,45 +303,49 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } } - private void performRollback(List records) { - try { - for (JDBCJournalRecord record : records) { - if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { - removeTxRecord(record); - } - } - - List txHolders = new ArrayList<>(); - txHolders.addAll(transactions.values()); - - // On rollback we must update the tx map to remove all the tx entries - for (TransactionHolder txH : txHolders) { - if (!txH.prepared && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) { - transactions.remove(txH.transactionID); - } - } - connection.rollback(); - } catch (Exception sqlE) { - logger.error(sqlE.getMessage(), sqlE); - criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null); - ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE); - } - } - - private void executeCallbacks(final List records, final boolean result) { + private void executeCallbacks(final List records, final boolean success) { Runnable r = new Runnable() { @Override public void run() { for (JDBCJournalRecord record : records) { - record.complete(result); + if (logger.isTraceEnabled()) { + logger.trace("Calling callback " + record + " with success = " + success); + } + record.complete(success); } } }; completeExecutor.execute(r); } + + private void checkStatus() { + checkStatus(null); + } + + private void checkStatus(IOCompletion callback) { + if (!started) { + if (callback != null) callback.onError(-1, "JDBC Journal is not loaded"); + throw new IllegalStateException("JDBCJournal is not loaded"); + } + + if (failed.get()) { + if (callback != null) callback.onError(-1, "JDBC Journal failed"); + throw new IllegalStateException("JDBCJournal Failed"); + } + } + + private void appendRecord(JDBCJournalRecord record) throws Exception { + // extra measure I know, as all the callers are also checking for this.. + // better to be safe ;) + checkStatus(); + + if (logger.isTraceEnabled()) { + logger.trace("appendRecord " + record); + } + record.storeLineUp(); if (!started) { if (record.getIoCompletion() != null) { @@ -330,6 +374,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } private synchronized void addTxRecord(JDBCJournalRecord record) { + + if (logger.isTraceEnabled()) { + logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed); + } + + checkStatus(); + TransactionHolder txHolder = transactions.get(record.getTxId()); if (txHolder == null) { txHolder = new TransactionHolder(record.getTxId()); @@ -349,28 +400,22 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } } - private synchronized void removeTxRecord(JDBCJournalRecord record) { - TransactionHolder txHolder = transactions.get(record.getTxId()); - - // We actually only need the record ID in this instance. - if (record.isTransactional()) { - RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount()); - if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) { - txHolder.recordsToDelete.remove(info); - } else { - txHolder.recordInfos.remove(info); - } - } else { - txHolder.prepared = false; - } - } - @Override public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); + + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecord bytes[] " + r); + } + + + appendRecord(r); } @@ -380,6 +425,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { r.setUserRecordType(recordType); r.setRecord(persister, record); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecord (encoding) " + r + " with record = " + record); + } + + appendRecord(r); } @@ -390,20 +441,36 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { Object record, boolean sync, IOCompletion completionCallback) throws Exception { + checkStatus(completionCallback); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(persister, record); r.setSync(sync); r.setIoCompletion(completionCallback); + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecord (completionCallback & encoding) " + r + " with record = " + record); + } + + appendRecord(r); } @Override public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecord (bytes)) " + r); + } + + appendRecord(r); } @@ -413,6 +480,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { r.setUserRecordType(recordType); r.setRecord(persister, record); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecord (encoding)) " + r + " with record " + record); + } + + appendRecord(r); } @@ -423,36 +496,67 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { Object record, boolean sync, IOCompletion completionCallback) throws Exception { + checkStatus(completionCallback); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(persister, record); r.setSync(sync); r.setIoCompletion(completionCallback); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecord (encoding & completioncallback)) " + r + " with record " + record); + } + + appendRecord(r); } @Override public void appendDeleteRecord(long id, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet()); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecord id=" + id + " sync=" + sync); + } + appendRecord(r); } @Override public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { + checkStatus(completionCallback); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet()); r.setSync(sync); r.setIoCompletion(completionCallback); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecord id=" + id + " sync=" + sync + " with completionCallback"); + } + + appendRecord(r); } @Override public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); appendRecord(r); + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + id + " using bytes[] r=" + r); + } + + } @Override @@ -465,15 +569,29 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { r.setUserRecordType(recordType); r.setRecord(persister, record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r); + } + + appendRecord(r); } @Override public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" + id + " using bytes and r=" + r); + } + + appendRecord(r); } @@ -487,46 +605,88 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { r.setUserRecordType(recordType); r.setRecord(persister, record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r); + } + appendRecord(r); } @Override public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setRecord(record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id + " using bytes and r=" + r); + } + appendRecord(r); } @Override public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setRecord(EncoderPersister.getInstance(), record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r); + } + + appendRecord(r); } @Override public void appendDeleteRecordTransactional(long txID, long id) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id); + } + appendRecord(r); } @Override public void appendCommitRecord(long txID, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendCommitRecord txID=" + txID + " sync=" + sync); + } + + appendRecord(r); } @Override public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); r.setIoCompletion(callback); + + if (logger.isTraceEnabled()) { + logger.trace("appendCommitRecord txID=" + txID + " callback=" + callback); + } + appendRecord(r); } @@ -535,20 +695,35 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setStoreLineUp(lineUpContext); r.setIoCompletion(callback); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendCommitRecord txID=" + txID + " using callback, lineup=" + lineUpContext); + } + appendRecord(r); } @Override public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendPrepareRecord txID=" + txID + " using sync=" + sync); + } + + appendRecord(r); } @@ -557,43 +732,74 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setTxData(transactionData); r.setSync(sync); r.setIoCompletion(callback); + + if (logger.isTraceEnabled()) { + logger.trace("appendPrepareRecord txID=" + txID + " using callback, sync=" + sync); + } + + appendRecord(r); } @Override public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendPrepareRecord txID=" + txID + " transactionData, sync=" + sync); + } + + appendRecord(r); } @Override public void appendRollbackRecord(long txID, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync); + } + appendRecord(r); } @Override public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); r.setIoCompletion(callback); + + if (logger.isTraceEnabled()) { + logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync + " using callback"); + } + + appendRecord(r); } @Override - public synchronized JournalLoadInformation load(LoaderCallback reloadManager) throws Exception { + public synchronized JournalLoadInformation load(LoaderCallback reloadManager) { JournalLoadInformation jli = new JournalLoadInformation(); JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager); JDBCJournalRecord r; @@ -643,6 +849,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId()); jli.setNumberOfRecords(noRecords); transactions = jrc.getTransactions(); + } catch (Throwable e) { + handleException(null, e); } return jli; } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index a33888dc40..6750da110e 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -306,4 +306,23 @@ class JDBCJournalRecord { long getSeq() { return seq; } + + @Override + public String toString() { + return "JDBCJournalRecord{" + + "compactCount=" + compactCount + + ", id=" + id + + ", isTransactional=" + isTransactional + + ", isUpdate=" + isUpdate + + ", recordType=" + recordType + + ", seq=" + seq + + ", storeLineUp=" + storeLineUp + + ", sync=" + sync + + ", txCheckNoRecords=" + txCheckNoRecords + + ", txDataSize=" + txDataSize + + ", txId=" + txId + + ", userRecordType=" + userRecordType + + ", variableSize=" + variableSize + + '}'; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java index 112cc46fd6..12e2b35f3a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java @@ -122,12 +122,10 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { if (sqlProviderFactory == null) { sqlProviderFactory = new GenericSQLProvider.Factory(); } - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); } else { String driverClassName = dbConf.getJdbcDriverClassName(); - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); } pagingFactoryFileFactory.start(); started = true; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 92cae64fd1..385376e123 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -614,13 +614,17 @@ public class ServerSessionPacketHandler implements ChannelHandler { doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel); if (logger.isTraceEnabled()) { - logger.trace("ServerSessionPacketHandler::response sent::" + response); + logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage); } } @Override public void done() { + if (logger.isTraceEnabled()) { + logger.trace("ServerSessionPacketHandler::regular response sent::" + response); + } + doConfirmAndResponse(confirmPacket, response, flush, closeChannel); } }); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java new file mode 100644 index 0000000000..cf2815c677 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jdbc.store.journal; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ShutdownServerTest extends ActiveMQTestBase { + + private ActiveMQServer server; + + private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue"); + + private ServerLocator locator; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + server = createServer(true, createDefaultJDBCConfig(false), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES); + server.start(); + + locator = createFactory(false); + } + + @Test + public void testShutdownServer() throws Throwable { + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true, false); + + session.createQueue(QUEUE, QUEUE, null, true); + + ClientConsumer consumer = session.createConsumer(QUEUE); + + ClientProducer producer = session.createProducer(QUEUE); + ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4); + message.getBodyBuffer().writeString("hi"); + message.putStringProperty("hello", "elo"); + producer.send(message); + + ActiveMQServerImpl impl = (ActiveMQServerImpl) server; + JournalStorageManager journal = (JournalStorageManager) impl.getStorageManager(); + JDBCJournalImpl journalimpl = (JDBCJournalImpl) journal.getMessageJournal(); + journalimpl.handleException(null, new Exception("failure")); + + Wait.waitFor(() -> !server.isStarted()); + + Assert.assertFalse(server.isStarted()); + + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java index 510452e743..07cacf241b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java @@ -95,7 +95,7 @@ public class BasicXaTest extends ActiveMQTestBase { configuration = createDefaultNettyConfig(); } - messagingService = createServer(false, configuration, -1, -1, addressSettings); + messagingService = createServer(true, configuration, -1, -1, addressSettings); // start the server messagingService.start();