diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java index 9120d79618..7cec2e46f4 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java @@ -249,6 +249,12 @@ public enum ActiveMQExceptionType { public ActiveMQException createException(String msg) { return new ActiveMQNullRefException(msg); } + }, + SHUTDOWN_ERROR(219) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQShutdownException(msg); + } }; private static final Map TYPE_MAP; diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java new file mode 100644 index 0000000000..03797a8273 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * An operation failed because an address exists on the server. + */ +public final class ActiveMQShutdownException extends ActiveMQException { + + public ActiveMQShutdownException() { + super(ActiveMQExceptionType.SHUTDOWN_ERROR); + } + + public ActiveMQShutdownException(String msg) { + super(ActiveMQExceptionType.SHUTDOWN_ERROR, msg); + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index ab9888e452..711d7cec4f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1500,9 +1500,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi XAException xaException = null; if (onePhase) { + logger.debug("Throwing oneFase RMFAIL on xid=" + xid, t); //we must return XA_RMFAIL xaException = new XAException(XAException.XAER_RMFAIL); } else { + logger.debug("Throwing twoFase Retry on xid=" + xid, t); // Any error on commit -> RETRY // We can't rollback a Prepared TX for definition xaException = new XAException(XAException.XA_RETRY); @@ -1753,7 +1755,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } catch (XAException xae) { throw xae; } catch (ActiveMQException e) { - if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) { + if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || e.getType() == ActiveMQExceptionType.SHUTDOWN_ERROR) { // Unblocked on failover throw new XAException(XAException.XA_RETRY); } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index f997b3cf3d..334bc46f90 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -31,7 +31,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQShutdownException; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncoderPersister; @@ -334,19 +336,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } - private void checkStatus() { + private void checkStatus() throws Exception { checkStatus(null); } - private void checkStatus(IOCompletion callback) { + private void checkStatus(IOCompletion callback) throws Exception { if (!started) { if (callback != null) callback.onError(-1, "JDBC Journal is not loaded"); - throw new IllegalStateException("JDBCJournal is not loaded"); + throw new ActiveMQShutdownException("JDBCJournal is not loaded"); } if (failed.get()) { if (callback != null) callback.onError(-1, "JDBC Journal failed"); - throw new IllegalStateException("JDBCJournal Failed"); + throw new ActiveMQException("JDBCJournal Failed"); } } @@ -388,7 +390,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { if (callback != null) callback.waitCompletion(); } - private synchronized void addTxRecord(JDBCJournalRecord record) { + private synchronized void addTxRecord(JDBCJournalRecord record) throws Exception { if (logger.isTraceEnabled()) { logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 55b92c5470..30ed6e33e2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQShutdownException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; @@ -823,6 +824,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal usedFile); } result.set(true); + } catch (ActiveMQShutdownException e) { + result.fail(e); + logger.error("appendPrepareRecord:" + e, e); } catch (Throwable e) { result.fail(e); setErrorCondition(callback, null, e); @@ -882,7 +886,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } result.set(true); - } catch (Exception e) { + } catch (ActiveMQShutdownException e) { + result.fail(e); + logger.error("appendUpdateRecord:" + e, e); + } catch (Throwable e) { result.fail(e); setErrorCondition(callback, null, e); logger.error("appendUpdateRecord:" + e, e); @@ -933,7 +940,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal record.delete(usedFile); } result.set(true); - } catch (Exception e) { + } catch (ActiveMQShutdownException e) { + result.fail(e); + logger.error("appendDeleteRecord:" + e, e); + } catch (Throwable e) { result.fail(e); logger.error("appendDeleteRecord:" + e, e); } finally { @@ -993,7 +1003,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } tx.addPositive(usedFile, id, addRecord.getEncodeSize()); - } catch (Exception e) { + } catch (Throwable e) { logger.error("appendAddRecordTransactional:" + e, e); setErrorCondition(null, tx, e); } finally { @@ -1031,9 +1041,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } - private void checkJournalIsLoaded() { + private void checkJournalIsLoaded() throws Exception { if (state != JournalState.LOADED && state != JournalState.SYNCING) { - throw new IllegalStateException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]"); + throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]"); } } @@ -1085,7 +1095,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() ); - } catch ( Exception e ) { + } catch (Throwable e ) { logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e ); setErrorCondition(null, tx, e ); } finally { @@ -1132,7 +1142,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } tx.addNegative(usedFile, id); - } catch (Exception e) { + } catch (Throwable e) { logger.error("appendDeleteRecordTransactional:" + e, e); setErrorCondition(null, tx, e); } finally { @@ -1185,7 +1195,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } tx.prepare(usedFile); - } catch (Exception e) { + } catch (ActiveMQShutdownException e) { + result.fail(e); + logger.error("appendPrepareRecord:" + e, e); + } catch (Throwable e) { result.fail(e); logger.error("appendPrepareRecord:" + e, e); setErrorCondition(callback, tx, e); @@ -1267,6 +1280,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.commit(usedFile); + } catch (ActiveMQShutdownException e) { + result.fail(e); + logger.error("appendCommitRecord:" + e, e); } catch (Throwable e) { result.fail(e); logger.error("appendCommitRecord:" + e, e); @@ -1317,6 +1333,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback); tx.rollback(usedFile); + } catch (ActiveMQShutdownException e) { + result.fail(e); + logger.error("appendRollbackRecord:" + e, e); } catch (Throwable e) { result.fail(e); logger.error("appendRollbackRecord:" + e, e); @@ -2360,10 +2379,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal return; } - setJournalState(JournalState.STOPPED); - flush(); + setJournalState(JournalState.STOPPED); + if (providedIOThreadPool == null) { threadPool.shutdown(); @@ -2681,6 +2700,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final JournalTransaction tx, final IOCallback parameterCallback) throws Exception { + checkJournalIsLoaded(); + final IOCallback callback; final int size = encoder.getEncodeSize(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 7c821a98f6..d28eec87fd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -126,8 +126,10 @@ import org.jboss.logging.Logger; */ public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager { - private static final int CRITICAL_PATHS = 1; - private static final int CRITICAL_STORE = 0; + protected static final int CRITICAL_PATHS = 3; + protected static final int CRITICAL_STORE = 0; + protected static final int CRITICAL_STOP = 1; + protected static final int CRITICAL_STOP_2 = 2; private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class); @@ -405,6 +407,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp leaveCritical(CRITICAL_STORE); } + /** for internal use and testsuite, don't use it outside of tests */ + public void writeLock() { + storageManagerLock.writeLock().lock(); + } + + /** for internal use and testsuite, don't use it outside of tests */ + public void writeUnlock() { + storageManagerLock.writeLock().unlock(); + } + @Override public void storeAcknowledge(final long queueID, final long messageID) throws Exception { readLock(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index dd8bb22a7e..867f2d43ec 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -229,9 +229,21 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } @Override - public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception { + public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception { + try { + enterCritical(CRITICAL_STOP); + synchronized (this) { + if (internalStop(ioCriticalError, sendFailover)) + return; + } + } finally { + leaveCritical(CRITICAL_STOP); + } + } + + private boolean internalStop(boolean ioCriticalError, boolean sendFailover) throws Exception { if (!started) { - return; + return true; } if (!ioCriticalError) { @@ -255,30 +267,41 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // that's ok } - // We cache the variable as the replicator could be changed between here and the time we call stop - // since sendLiveIsStopping may issue a close back from the channel - // and we want to ensure a stop here just in case - ReplicationManager replicatorInUse = replicator; - if (replicatorInUse != null) { - if (sendFailover) { - final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER); - if (token != null) { - try { - token.waitCompletion(5000); - } catch (Exception e) { - // ignore it + enterCritical(CRITICAL_STOP_2); + storageManagerLock.writeLock().lock(); + try { + + // We cache the variable as the replicator could be changed between here and the time we call stop + // since sendLiveIsStopping may issue a close back from the channel + // and we want to ensure a stop here just in case + ReplicationManager replicatorInUse = replicator; + if (replicatorInUse != null) { + if (sendFailover) { + final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER); + if (token != null) { + try { + token.waitCompletion(5000); + } catch (Exception e) { + // ignore it + } } } + // we cannot clear replication tokens, otherwise clients will eventually be informed of completion during a server's shutdown + // while the backup will never receive then + replicatorInUse.stop(false); } - replicatorInUse.stop(); + bindingsJournal.stop(); + + messageJournal.stop(); + + journalLoaded = false; + + started = false; + } finally { + storageManagerLock.writeLock().unlock(); + leaveCritical(CRITICAL_STOP_2); } - bindingsJournal.stop(); - - messageJournal.stop(); - - journalLoaded = false; - - started = false; + return false; } /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 15d53116e1..998bbcfde8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -150,6 +150,20 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon journals[id] = journal; } + /** + * This is for tests basically, do not use it as its API is not guaranteed for future usage. + */ + public void pause() { + started = false; + } + + /** + * This is for tests basically, do not use it as its API is not guaranteed for future usage. + */ + public void resume() { + started = true; + } + @Override public void handlePacket(final Packet packet) { if (logger.isTraceEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index fbf7c6c539..6973706505 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -282,6 +282,10 @@ public final class ReplicationManager implements ActiveMQComponent { @Override public void stop() throws Exception { + stop(true); + } + + public void stop(boolean clearTokens) throws Exception { synchronized (this) { if (!started) { logger.trace("Stopping being ignored as it hasn't been started"); @@ -297,7 +301,10 @@ public final class ReplicationManager implements ActiveMQComponent { enabled = false; writable.set(true); - clearReplicationTokens(); + + if (clearTokens) { + clearReplicationTokens(); + } RemotingConnection toStop = remotingConnection; if (toStop != null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java new file mode 100644 index 0000000000..64343e308e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.replication.ReplicationEndpoint; +import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; +import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; +import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; +import org.junit.Assert; +import org.junit.Test; + +public class NettyReplicationStopTest extends FailoverTestBase { + + @Override + protected TestableServer createTestableServer(Configuration config) { + return new SameProcessActiveMQServer(createServer(true, config)); + } + + @Override + protected void createConfigs() throws Exception { + createReplicatedConfigs(); + } + + @Override + protected NodeManager createNodeManager() throws Exception { + return new InVMNodeManager(false); + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return getNettyAcceptorTransportConfiguration(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return getNettyConnectorTransportConfiguration(live); + } + + @Override + protected final void crash(boolean waitFailure, ClientSession... sessions) throws Exception { + if (sessions.length > 0) { + for (ClientSession session : sessions) { + waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer()); + } + } else { + waitForRemoteBackup(null, 5, true, backupServer.getServer()); + } + super.crash(waitFailure, sessions); + } + + @Override + protected final void crash(ClientSession... sessions) throws Exception { + crash(true, sessions); + } + + @Test + public void testReplicaStop() throws Exception { + + Map params = new HashMap<>(); + params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1"); + TransportConfiguration tc = createTransportConfiguration(true, false, params); + + ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(15); + + final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2); + + ClientSession session = sf.createSession(true, true); + + session.createQueue(ADDRESS, ADDRESS, null, true); + + ClientProducer producer = session.createProducer(ADDRESS); + + final int numMessages = 10; + + ReplicationEndpoint endpoint = backupServer.getServer().getReplicationEndpoint(); + + endpoint.pause(); + + ArrayList threads = new ArrayList<>(); + final ArrayList codesSent = new ArrayList<>(); + + CountDownLatch alignedOnSend = new CountDownLatch(10); + + for (int i = 0; i < numMessages; i++) { + final int code = i; + Thread t = new Thread("WillSend " + code) { + @Override + public void run() { + try { + ClientSession session = sf.createSession(true, true); + + ClientProducer producer = session.createProducer(ADDRESS); + + ClientMessage message = session.createMessage(true).putIntProperty("i", code); + alignedOnSend.countDown(); + System.out.println("blocking!!"); + producer.send(message); + codesSent.add(code); + + System.out.println("Sent!"); + + } catch (Exception e) { + // that's ok; + e.printStackTrace(); // logging just for debug & reference + } + } + }; + + t.start(); + + threads.add(t); + } + + Assert.assertTrue(alignedOnSend.await(10, TimeUnit.SECONDS)); + liveServer.stop(); + + Assert.assertEquals(0, codesSent.size()); + + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java index 04fd1a98e6..d0d8ce3bc1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -364,6 +365,105 @@ public class BasicXaTest extends ActiveMQTestBase { } + @Test + public void testPrepareError() throws Exception { + Xid xid = newXID(); + + ClientSession clientSession2 = sessionFactory.createSession(false, true, true); + ClientProducer clientProducer = clientSession2.createProducer(atestq); + ClientMessage m1 = createTextMessage(clientSession2, "m1"); + ClientMessage m2 = createTextMessage(clientSession2, "m2"); + ClientMessage m3 = createTextMessage(clientSession2, "m3"); + ClientMessage m4 = createTextMessage(clientSession2, "m4"); + clientProducer.send(m1); + clientProducer.send(m2); + clientProducer.send(m3); + clientProducer.send(m4); + + clientSession.start(xid, XAResource.TMNOFLAGS); + clientSession.start(); + ClientConsumer clientConsumer = clientSession.createConsumer(atestq); + ClientMessage m = clientConsumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m1"); + m = clientConsumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m2"); + m = clientConsumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m3"); + m = clientConsumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m4"); + clientSession.end(xid, XAResource.TMSUCCESS); + + StorageManager journalStorageManager = messagingService.getStorageManager(); + + clientSession.prepare(xid); + + journalStorageManager.getMessageJournal().stop(); + try { + clientSession.commit(xid, false); + Assert.fail("Exception exptected"); + } catch (XAException e) { + Assert.assertTrue(e.errorCode == XAException.XA_RETRY); + } + } + + + @Test + public void testRollbackError() throws Exception { + Xid xid = newXID(); + + ClientSession clientSession2 = sessionFactory.createSession(false, true, true); + ClientProducer clientProducer = clientSession2.createProducer(atestq); + ClientMessage m1 = createTextMessage(clientSession2, "m1"); + ClientMessage m2 = createTextMessage(clientSession2, "m2"); + ClientMessage m3 = createTextMessage(clientSession2, "m3"); + ClientMessage m4 = createTextMessage(clientSession2, "m4"); + clientProducer.send(m1); + clientProducer.send(m2); + clientProducer.send(m3); + clientProducer.send(m4); + + clientSession.start(xid, XAResource.TMNOFLAGS); + clientSession.start(); + ClientConsumer clientConsumer = clientSession.createConsumer(atestq); + ClientMessage m = clientConsumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m1"); + m = clientConsumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m2"); + m = clientConsumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m3"); + m = clientConsumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m4"); + clientSession.end(xid, XAResource.TMSUCCESS); + + StorageManager journalStorageManager = messagingService.getStorageManager(); + + clientSession.prepare(xid); + + journalStorageManager.getMessageJournal().stop(); + try { + clientSession.rollback(xid); + Assert.fail("Exception exptected"); + } catch (XAException e) { + Assert.assertTrue(e.errorCode == XAException.XA_RETRY); + } + } + @Test public void testReceiveRollback() throws Exception { int numSessions = 100;