ARTEMIS-4480: rationalise openwire session tx usage and operation context usage, use completion callbacks to ensure exclusive consumers are isolated

This commit is contained in:
Gary Tully 2023-10-31 15:53:09 +00:00
parent 6bc7b1bc9f
commit 91fd12ad1f
10 changed files with 691 additions and 81 deletions

View File

@ -682,18 +682,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private void rollbackInProgressLocalTransactions() {
for (Transaction tx : txMap.values()) {
AMQSession session = (AMQSession) tx.getProtocolData();
if (session != null) {
session.getCoreSession().resetTX(tx);
try {
session.getCoreSession().rollback(false);
} catch (Exception expectedOnExistingOutcome) {
} finally {
session.getCoreSession().resetTX(null);
}
} else {
tx.tryRollback();
}
tx.tryRollback();
}
}
@ -729,7 +718,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
for (SessionId sessionId : sessionIdMap.values()) {
AMQSession session = sessions.get(sessionId);
if (session != null) {
session.close();
session.close(fail);
}
}
internalSession.close(false);
@ -782,8 +771,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
recoverOperationContext();
rollbackInProgressLocalTransactions();
if (me != null) {
//filter it like the other protocols
if (!(me instanceof ActiveMQRemoteDisconnectException)) {
@ -796,6 +783,20 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
} catch (InvalidClientIDException e) {
logger.warn("Couldn't close connection because invalid clientID", e);
} finally {
// there may be some transactions not associated with sessions
// deal with them after sessions are removed via connection removal
operationContext.executeOnCompletion(new IOCallback() {
@Override
public void done() {
rollbackInProgressLocalTransactions();
}
@Override
public void onError(int errorCode, String errorMessage) {
rollbackInProgressLocalTransactions();
}
});
}
shutdown(true);
}
@ -1338,7 +1339,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
Transaction tx = lookupTX(info.getTransactionId(), null, true);
Transaction tx = lookupTX(info.getTransactionId(), null);
if (tx == null) {
throw new IllegalStateException("Transaction not started, " + info.getTransactionId());
}
final AMQSession amqSession;
if (tx != null) {
@ -1354,11 +1359,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
if (amqSession != null) {
amqSession.getCoreSession().resetTX(tx);
try {
returnReferences(tx, amqSession);
} finally {
amqSession.getCoreSession().resetTX(null);
}
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void beforeRollback(Transaction tx) throws Exception {
returnReferences(tx, amqSession);
}
});
}
}
@ -1406,6 +1412,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
} else {
if (tx != null) {
// returnReferences() is only interested in acked messages already in the tx, hence we bypass
// getCoreSession().rollback() logic for CORE which would add any prefetched/delivered which have
// not yet been processed by the openwire client.
tx.rollback();
}
}
@ -1490,18 +1499,46 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processBeginTransaction(TransactionInfo info) throws Exception {
final TransactionId txID = info.getTransactionId();
try {
internalSession.resetTX(null);
if (txID.isXATransaction()) {
Xid xid = OpenWireUtil.toXID(txID);
final Xid xid = OpenWireUtil.toXID(txID);
internalSession.xaStart(xid);
final ResourceManager resourceManager = server.getResourceManager();
final Transaction transaction = resourceManager.getTransaction(xid);
transaction.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
removeFromResourceManager();
}
@Override
public void afterRollback(Transaction tx) {
removeFromResourceManager();
}
private void removeFromResourceManager() {
try {
resourceManager.removeTransaction(xid, getRemotingConnection());
} catch (ActiveMQException bestEffort) {
}
}
});
} else {
Transaction transaction = internalSession.newTransaction();
final Transaction transaction = internalSession.newTransaction();
txMap.put(txID, transaction);
transaction.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
removeFromTxMap();
}
@Override
public void afterRollback(Transaction tx) {
removeFromTxMap();
}
private void removeFromTxMap() {
txMap.remove(txID);
}
});
@ -1520,7 +1557,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private Response processCommit(TransactionInfo info, boolean onePhase) throws Exception {
TransactionId txID = info.getTransactionId();
Transaction tx = lookupTX(txID, null, true);
Transaction tx = lookupTX(txID, null);
if (tx == null) {
throw new IllegalStateException("Transaction not started, " + txID);
}
if (txID.isXATransaction()) {
ResourceManager resourceManager = server.getResourceManager();
@ -1557,7 +1598,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
} else {
if (tx != null) {
tx.commit(onePhase);
AMQSession amqSession = (AMQSession) tx.getProtocolData();
if (amqSession != null) {
amqSession.getCoreSession().resetTX(tx);
amqSession.getCoreSession().commit();
} else {
tx.commit(true);
}
}
}
@ -1630,8 +1677,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
logger.warn("Error during method invocation", e);
throw e;
}
} else {
txMap.remove(txID);
}
return null;
@ -1705,8 +1750,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
sendException(e);
}
throw e;
} finally {
session.getCoreSession().resetTX(null);
}
return null;
@ -1716,6 +1759,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processMessageAck(MessageAck ack) throws Exception {
AMQSession session = getSession(ack.getConsumerId().getParentId());
Transaction tx = lookupTX(ack.getTransactionId(), session);
if (ack.getTransactionId() != null && tx == null) {
throw new IllegalStateException("Transaction not started, " + ack.getTransactionId());
}
session.getCoreSession().resetTX(tx);
try {
@ -1725,8 +1773,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
}
} finally {
session.getCoreSession().resetTX(null);
}
return null;
}
@ -1819,21 +1865,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
private Transaction lookupTX(TransactionId txID, AMQSession session) throws Exception {
return lookupTX(txID, session, false);
}
private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws Exception {
if (txID == null) {
return null;
}
Xid xid = null;
Transaction transaction;
if (txID.isXATransaction()) {
xid = OpenWireUtil.toXID(txID);
transaction = remove ? server.getResourceManager().removeTransaction(xid, this) : server.getResourceManager().getTransaction(xid);
final Xid xid = OpenWireUtil.toXID(txID);
transaction = server.getResourceManager().getTransaction(xid);
} else {
transaction = remove ? txMap.remove(txID) : txMap.get(txID);
transaction = txMap.get(txID);
}
if (transaction == null) {

View File

@ -547,6 +547,14 @@ public class AMQSession implements SessionCallback {
this.coreSession.close(false);
}
@Override
public void close(boolean failed) {
try {
this.coreSession.close(failed);
} catch (Exception bestEffort) {
}
}
public OpenWireConnection getConnection() {
return connection;
}

View File

@ -62,7 +62,7 @@ public interface OperationContext extends IOCompletion {
*/
boolean waitCompletion(long timeout) throws Exception;
default void clear() {
default void reset() {
}
}

View File

@ -449,7 +449,7 @@ public class OperationContextImpl implements OperationContext {
}
@Override
public synchronized void clear() {
public synchronized void reset() {
stored = 0;
storeLineUpField = 0;
minimalReplicated = 0;

View File

@ -1534,7 +1534,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (consumer == exclusiveConsumer) {
exclusiveConsumer = null;
// await context completion such that any delivered are returned
// to the queue. Preserve an ordered view for the next exclusive
// consumer
storageManager.afterCompleteOperations(new IOCallback() {
@Override
public void onError(final int errorCode, final String errorMessage) {
releaseExclusiveConsumer();
}
@Override
public void done() {
releaseExclusiveConsumer();
}
});
}
groups.removeIf(consumer::equals);
@ -1543,6 +1559,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private void releaseExclusiveConsumer() {
synchronized (QueueImpl.this) {
exclusiveConsumer = null;
resetAllIterators();
}
deliverAsync();
}
private void stopDispatch() {
boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(false));
if (stopped) {
@ -3136,7 +3160,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (holder == null) {
// this shouldn't happen, however I'm adding this check just in case
logger.debug("consumers.next() returned null.");
consumers.remove();
deliverAsync(true);
return false;
}

View File

@ -142,7 +142,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected final ServerProducers serverProducers;
protected Transaction tx;
protected volatile Transaction tx;
/** This will store the Transaction between xaEnd and xaPrepare or xaCommit.
* in a failure scenario (client is gone), this will be held between xaEnd and xaCommit. */
@ -395,15 +395,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
callback.close(failed);
}
synchronized (this) {
if (!closed) {
if (server.hasBrokerSessionPlugins()) {
server.callBrokerSessionPlugins(plugin -> plugin.beforeCloseSession(this, failed));
}
if (server.hasBrokerSessionPlugins()) {
server.callBrokerSessionPlugins(plugin -> plugin.beforeCloseSession(this, failed));
}
this.setStarted(false);
if (closed)
return;
if (failed) {
Transaction txToRollback = tx;
@ -432,7 +427,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
}
closed = true;
}
//putting closing of consumers outside the sync block
@ -653,7 +647,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
* Notice that we set autoCommitACK and autoCommitSends to true if tx == null
*/
@Override
public void resetTX(Transaction transaction) {
public synchronized void resetTX(Transaction transaction) {
this.tx = transaction;
this.autoCommitAcks = transaction == null;
this.autoCommitSends = transaction == null;
@ -1724,20 +1718,29 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void close(final boolean failed, final boolean force) {
if (closed)
return;
synchronized (this) {
if (closed) {
return;
}
closed = true;
}
if (force) {
context.clear();
context.reset();
}
context.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
callDoClose();
}
@Override
public void done() {
callDoClose();
}
private void callDoClose() {
try {
doClose(failed);
} catch (Exception e) {

View File

@ -382,8 +382,7 @@ public class TransactionImpl implements Transaction {
// We will like to execute afterRollback and clear anything pending
ActiveMQServerLogger.LOGGER.failedToPerformRollback(e);
}
// We want to make sure that nothing else gets done after the commit is issued
// this will eliminate any possibility or races
// We want to make sure that nothing else gets done after the rollback is issued
final List<TransactionOperation> operationsToComplete = this.operations;
this.operations = null;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -27,6 +28,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.junit.Assert;
import org.junit.Test;
@ -247,6 +249,45 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
}
}
@Test
public void testSequentialCompletionN() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
ConcurrentLinkedQueue<Long> completions = new ConcurrentLinkedQueue();
final int N = 500;
try {
final OperationContextImpl impl = new OperationContextImpl(new OrderedExecutor(executor));
// pending work to queue completions till done
impl.storeLineUp();
for (long l = 0; l < N; l++) {
long finalL = l;
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
completions.add(finalL);
}
});
}
impl.done();
Wait.assertEquals(N, ()-> completions.size());
for (long i = 0; i < N; i++) {
assertEquals("ordered", i, (long) completions.poll());
}
} finally {
executor.shutdownNow();
}
}
@Test
public void testErrorNotLostOnPageSyncError() throws Exception {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -26,6 +27,8 @@ import javax.jms.TextMessage;
import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -36,16 +39,25 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@ -67,7 +79,14 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
// force send to dlq early
addressSettingsMap.put("exampleQueue", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
// force send to dlq late
addressSettingsMap.put("exampleQueueTwo", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(4000));
addressSettingsMap.put("exampleQueueTwo", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(-1));
}
@Override
protected void extraServerConfig(Configuration serverConfig) {
// useful to debug if there is some contention that causes a concurrent rollback,
// when true, the rollback journal update uses the operation context and the failure propagates
//serverConfig.setJournalSyncTransactional(true);
}
@Test(timeout = 60_000)
@ -155,7 +174,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
TextMessage message = session.createTextMessage("This is a text message");
int numMessages = 2000;
int numMessages = 10000;
for (int i = 0; i < numMessages; i++) {
message.setIntProperty("SEQ", i);
producer.send(message);
@ -163,7 +182,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
session.commit();
exConn.close();
final int batch = 100;
final int batch = 200;
for (int i = 0; i < numMessages; i += batch) {
// connection per batch
exConn = exFact.createConnection();
@ -172,7 +191,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
session = exConn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage messageReceived = null;
TextMessage messageReceived;
for (int j = 0; j < batch; j++) { // a small batch
messageReceived = (TextMessage) messageConsumer.receive(5000);
Assert.assertNotNull("null @ i=" + i, messageReceived);
@ -183,7 +202,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
session.commit();
// force a local socket close such that the broker sees an exception on the connection and fails the consumer via close
((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class).stop();
exConn.close();
}
} finally {
@ -194,9 +213,429 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
}
@Test(timeout = 60_000)
public void testServerSideRollbackOnCloseOrder() throws Exception {
final ArrayList<Throwable> errors = new ArrayList<>();
SimpleString durableQueue = new SimpleString("exampleQueueTwo");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
Queue queue = new ActiveMQQueue(durableQueue.toString());
final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=10&maxReconnectAttempts=0&timeout=1000");
exFact.setWatchTopicAdvisories(false);
exFact.setConnectResponseTimeout(10000);
ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new ActiveMQPrefetchPolicy();
prefetchPastMaxDeliveriesInLoop.setAll(2000);
exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setRedeliveryDelay(0);
redeliveryPolicy.setMaximumRedeliveries(-1);
exFact.setRedeliveryPolicy(redeliveryPolicy);
Connection exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("This is a text message");
int numMessages = 1000;
for (int i = 0; i < numMessages; i++) {
message.setIntProperty("SEQ", i);
producer.send(message);
}
session.commit();
exConn.close();
final int numConsumers = 2;
ExecutorService executorService = Executors.newCachedThreadPool();
// consume under load
final int numLoadProducers = 2;
underLoad(numLoadProducers, ()-> {
// a bunch of concurrent batch consumers, expecting order
AtomicBoolean done = new AtomicBoolean(false);
AtomicInteger receivedCount = new AtomicInteger();
AtomicInteger inProgressBatch = new AtomicInteger();
final int batch = 100;
final ExecutorService commitExecutor = Executors.newCachedThreadPool();
Runnable consumerTask = () -> {
Connection toCloseOnError = null;
while (!done.get() && receivedCount.get() < 20 * numMessages) {
try (Connection consumerConnection = exFact.createConnection()) {
toCloseOnError = consumerConnection;
((ActiveMQConnection) consumerConnection).setCloseTimeout(1); // so rollback on close won't block after socket close exception
consumerConnection.start();
Session consumerConnectionSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = consumerConnectionSession.createConsumer(queue);
TextMessage messageReceived = null;
int i = 0;
for (; i < batch; i++) {
messageReceived = (TextMessage) messageConsumer.receive(2000);
if (messageReceived == null) {
break;
}
receivedCount.incrementAndGet();
// need to infer batch from seq number and adjust - client never gets commit response
int receivedSeq = messageReceived.getIntProperty("SEQ");
int currentBatch = (receivedSeq / batch);
if (i == 0) {
if (inProgressBatch.get() != currentBatch) {
if (inProgressBatch.get() + 1 == currentBatch) {
inProgressBatch.incrementAndGet(); // all good, next batch
logger.info("@:" + receivedCount.get() + ", current batch increment to: " + inProgressBatch.get() + ", Received Seq: " + receivedSeq + ", Message: " + messageReceived);
} else {
// we have an order problem
done.set(true);
throw new AssertionError("@:" + receivedCount.get() + ", batch out of sequence, expected: " + (inProgressBatch.get() + 1) + ", but have: " + currentBatch + " @" + receivedSeq + ", Message: " + messageReceived);
}
}
}
// verify within batch order
Assert.assertEquals("@:" + receivedCount.get() + " batch out of order", ((long) batch * inProgressBatch.get()) + i, receivedSeq);
}
if (i != batch) {
continue;
}
// manual ack in tx to setup server for rollback work on fail
Transport transport = ((ActiveMQConnection) consumerConnection).getTransport();
TransactionId txId = new LocalTransactionId(((ActiveMQConnection) consumerConnection).getConnectionInfo().getConnectionId(), receivedCount.get());
TransactionInfo tx = new TransactionInfo(((ActiveMQConnection) consumerConnection).getConnectionInfo().getConnectionId(), txId, TransactionInfo.BEGIN);
transport.request(tx);
MessageAck ack = new MessageAck();
ActiveMQMessage mqMessage = (ActiveMQMessage) messageReceived;
ack.setDestination(mqMessage.getDestination());
ack.setMessageID(mqMessage.getMessageId());
ack.setMessageCount(batch);
ack.setTransactionId(tx.getTransactionId());
ack.setConsumerId(((ActiveMQMessageConsumer)messageConsumer).getConsumerId());
transport.request(ack);
try {
// force a local socket close such that the broker sees an exception on the connection and fails the consumer via serverConsumer close
((ActiveMQConnection) consumerConnection).getTransport().narrow(TcpTransport.class).stop();
} catch (Throwable expected) {
}
} catch (ConcurrentModificationException | NullPointerException ignored) {
} catch (JMSException ignored) {
// expected on executor stop
} catch (Throwable unexpected) {
unexpected.printStackTrace();
errors.add(unexpected);
done.set(true);
} finally {
if (toCloseOnError != null) {
try {
toCloseOnError.close();
} catch (Throwable ignored) {
}
}
}
}
};
for (int i = 0; i < numConsumers; i++) {
executorService.submit(consumerTask);
}
executorService.shutdown();
try {
assertTrue(executorService.awaitTermination(30, TimeUnit.SECONDS));
assertTrue(errors.isEmpty());
} catch (Throwable t) {
errors.add(t);
} finally {
done.set(true);
commitExecutor.shutdownNow();
executorService.shutdownNow();
}
Assert.assertTrue("errors: " + errors, errors.isEmpty());
});
Assert.assertTrue(errors.isEmpty());
}
@Test(timeout = 60_000)
public void testExclusiveConsumerBatchOrderUnderLoad() throws Exception {
final ArrayList<Throwable> errors = new ArrayList<>();
SimpleString durableQueue = new SimpleString("exampleQueueTwo");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
Queue queue = new ActiveMQQueue(durableQueue.toString());
final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=10&maxReconnectAttempts=0&timeout=1000");
exFact.setWatchTopicAdvisories(false);
exFact.setConnectResponseTimeout(10000);
ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new ActiveMQPrefetchPolicy();
prefetchPastMaxDeliveriesInLoop.setAll(2000);
exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setRedeliveryDelay(0);
redeliveryPolicy.setMaximumRedeliveries(-1);
exFact.setRedeliveryPolicy(redeliveryPolicy);
Connection exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("This is a text message");
int numMessages = 10000;
for (int i = 0; i < numMessages; i++) {
message.setIntProperty("SEQ", i);
producer.send(message);
}
session.commit();
exConn.close();
final int numConsumers = 4;
ExecutorService executorService = Executors.newCachedThreadPool();
// consume under load
final int numLoadProducers = 4;
underLoad(numLoadProducers, ()-> {
// a bunch of concurrent batch consumers, expecting order
AtomicBoolean done = new AtomicBoolean(false);
AtomicInteger receivedCount = new AtomicInteger();
AtomicInteger inProgressBatch = new AtomicInteger();
final int batch = 200;
final ExecutorService commitExecutor = Executors.newCachedThreadPool();
Runnable consumerTask = () -> {
Connection toCloseOnError = null;
while (!done.get() && server.locateQueue(durableQueue).getMessageCount() > 0L) {
try (Connection consumerConnection = exFact.createConnection()) {
toCloseOnError = consumerConnection;
((ActiveMQConnection) consumerConnection).setCloseTimeout(1); // so rollback on close won't block after socket close exception
consumerConnection.start();
Session consumerConnectionSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = consumerConnectionSession.createConsumer(queue);
TextMessage messageReceived;
int i = 0;
for (; i < batch; i++) {
messageReceived = (TextMessage) messageConsumer.receive(2000);
if (messageReceived == null) {
break;
}
receivedCount.incrementAndGet();
// need to infer batch from seq number and adjust - client never gets commit response
int receivedSeq = messageReceived.getIntProperty("SEQ");
int currentBatch = (receivedSeq / batch);
if (i == 0) {
if (inProgressBatch.get() != currentBatch) {
if (inProgressBatch.get() + 1 == currentBatch) {
inProgressBatch.incrementAndGet(); // all good, next batch
logger.info("@:" + receivedCount.get() + ", current batch increment to: " + inProgressBatch.get() + ", Received Seq: " + receivedSeq + ", Message: " + messageReceived);
} else {
// we have an order problem
done.set(true);
throw new AssertionError("@:" + receivedCount.get() + ", batch out of sequence, expected: " + (inProgressBatch.get() + 1) + ", but have: " + currentBatch + " @" + receivedSeq + ", Message: " + messageReceived);
}
}
}
// verify within batch order
Assert.assertEquals("@:" + receivedCount.get() + " batch out of order", ((long) batch * inProgressBatch.get()) + i, receivedSeq);
}
if (i != batch) {
continue;
}
// arrange concurrent commit - ack/commit of batch
// with server side error, potential for ack/commit and close-on-fail to contend
final CountDownLatch latch = new CountDownLatch(1);
final Session finalSession = consumerConnectionSession;
commitExecutor.submit(() -> {
try {
latch.countDown();
finalSession.commit();
} catch (Throwable expected) {
}
});
latch.await(1, TimeUnit.SECONDS);
// give a chance to have a batch complete to make progress!
TimeUnit.MILLISECONDS.sleep(15);
try {
// force a local socket close such that the broker sees an exception on the connection and fails the consumer via serverConsumer close
((ActiveMQConnection) consumerConnection).getTransport().narrow(TcpTransport.class).stop();
} catch (Throwable expected) {
}
} catch (InterruptedException | ConcurrentModificationException | NullPointerException ignored) {
} catch (JMSException ignored) {
// expected on executor stop
} catch (Throwable unexpected) {
unexpected.printStackTrace();
errors.add(unexpected);
done.set(true);
} finally {
if (toCloseOnError != null) {
try {
toCloseOnError.close();
} catch (Throwable ignored) {
}
}
}
}
};
for (int i = 0; i < numConsumers; i++) {
executorService.submit(consumerTask);
}
executorService.shutdown();
try {
Wait.assertEquals(0L, () -> {
if (!errors.isEmpty()) {
return -1;
}
return server.locateQueue(durableQueue).getMessageCount();
}, 30 * 1000);
} catch (Throwable t) {
errors.add(t);
} finally {
done.set(true);
commitExecutor.shutdownNow();
executorService.shutdownNow();
}
Assert.assertTrue(errors.isEmpty());
});
Assert.assertTrue(errors.isEmpty());
}
public void underLoad(final int numProducers, Runnable r) throws Exception {
// produce some load with a producer(s)/consumer
SimpleString durableQueue = new SimpleString("exampleQueue");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
ExecutorService executor = Executors.newFixedThreadPool(numProducers + 1);
Queue queue;
ConnectionFactory cf;
boolean useCoreForLoad = true;
if (useCoreForLoad) {
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
connectionFactory.setConfirmationWindowSize(1000000);
connectionFactory.setBlockOnDurableSend(true);
connectionFactory.setBlockOnNonDurableSend(true);
cf = connectionFactory;
queue = connectionFactory.createContext().createQueue(durableQueue.toString());
} else {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?startupMaxReconnectAttempts=0&maxReconnectAttempts=0&timeout=1000");
connectionFactory.setWatchTopicAdvisories(false);
connectionFactory.setCloseTimeout(1);
connectionFactory.setSendTimeout(2000);
cf = connectionFactory;
queue = new ActiveMQQueue(durableQueue.toString());
}
final Queue destination = queue;
final ConnectionFactory connectionFactory = cf;
final AtomicBoolean done = new AtomicBoolean();
Runnable producerTask = ()-> {
try (Connection exConn = connectionFactory.createConnection()) {
exConn.start();
final Session session = exConn.createSession(true, Session.SESSION_TRANSACTED);
final MessageProducer producer = session.createProducer(destination);
final TextMessage message = session.createTextMessage("This is a text message");
int count = 1;
while (!done.get()) {
producer.send(message);
if ((count++ % 100) == 0) {
session.commit();
}
}
} catch (Exception ignored) {
}
};
for (int i = 0; i < numProducers; i++) {
executor.submit(producerTask);
}
// one consumer
executor.submit(()-> {
try (Connection exConn = connectionFactory.createConnection()) {
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(destination);
while (!done.get()) {
messageConsumer.receive(200);
}
} catch (Exception ignored) {
}
});
try {
r.run();
} finally {
done.set(true);
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
logger.info("LOAD ADDED: " + server.locateQueue(durableQueue).getMessagesAdded());
}
}
@Test(timeout = 120_000)
public void testExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws Exception {
doTestExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch();
}
public void doTestExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws Exception {
Connection exConn = null;
ExecutorService executorService = Executors.newFixedThreadPool(10);
SimpleString durableQueue = new SimpleString("exampleQueueTwo");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
AtomicInteger batchConsumed = new AtomicInteger(0);
@ -221,8 +660,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
TextMessage message = session.createTextMessage("This is a text message");
ExecutorService executorService = Executors.newSingleThreadExecutor();
int numMessages = 600;
int numMessages = 1000;
for (int i = 0; i < numMessages; i++) {
message.setIntProperty("SEQ", i);
producer.send(message);
@ -230,7 +668,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
session.close();
exConn.close();
final int batch = numMessages;
final int batch = 200;
AtomicBoolean done = new AtomicBoolean(false);
while (!done.get()) {
// connection per batch attempt
@ -242,42 +680,53 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
session = exConn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage messageReceived = null;
TextMessage messageReceived;
int received = 0;
for (int j = 0; j < batch; j++) {
messageReceived = (TextMessage) messageConsumer.receive(2000);
if (messageReceived == null) {
done.set(true);
break;
}
received++;
batchConsumed.incrementAndGet();
assertEquals("This is a text message", messageReceived.getText());
int receivedSeq = messageReceived.getIntProperty("SEQ");
// need to infer batch from seq number and adjust - client never gets commit response
Assert.assertEquals("@:" + received + ", out of order", (batch * (receivedSeq / batch)) + j, receivedSeq);
}
// arrange concurrent commit - ack/commit
// with server side error, potential for ack/commit and close-on-fail to contend
final CountDownLatch latch = new CountDownLatch(1);
Session finalSession = session;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
latch.countDown();
finalSession.commit();
executorService.submit(() -> {
try {
latch.countDown();
finalSession.commit();
} catch (JMSException e) {
}
} catch (JMSException ignored) {
}
});
latch.await(1, TimeUnit.SECONDS);
// force a local socket close such that the broker sees an exception on the connection and fails the consumer via serverConsumer close
((FailoverTransport) ((org.apache.activemq.ActiveMQConnection) exConn).getTransport().narrow(FailoverTransport.class)).stop();
exConn.close();
((org.apache.activemq.ActiveMQConnection) exConn).getTransport().narrow(FailoverTransport.class).stop();
// retry asap, not waiting for client close
final Connection finalConToClose = exConn;
executorService.submit(() -> {
try {
finalConToClose.close();
} catch (JMSException ignored) {
}
});
}
} finally {
if (exConn != null) {
exConn.close();
}
executorService.shutdownNow();
}
logger.info("Done after: {}, queue: {}", batchConsumed.get(), server.locateQueue(durableQueue));

View File

@ -413,7 +413,7 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
connection.getPrefetchPolicy().setAll(prefetchSize);
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
this.makeSureCoreQueueExist("TEST");
@ -433,13 +433,13 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
for (int i = 0; i < messageCount; i++) {
m = consumer.receive(2000);
assertNotNull(m);
assertNotNull("null@:" + i, m);
if (i == 3) {
session.rollback();
continue;
}
session.commit();
assertTrue(queueControl.getDeliveringCount() <= prefetchSize);
assertTrue(queueControl.getDeliveringCount() <= prefetchSize + 1);
}
m = consumer.receive(2000);
@ -448,6 +448,52 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
}
/**
* @throws Exception
*/
@Test
public void testCanRollbackPastPrefetch() throws Exception {
final int prefetchSize = 10;
final int messageCount = 2 * prefetchSize;
connection.getPrefetchPolicy().setAll(prefetchSize);
connection.getRedeliveryPolicy().setMaximumRedeliveries(prefetchSize + 1);
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
this.makeSureCoreQueueExist("TEST");
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + "TEST");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < messageCount; i++) {
producer.send(session.createTextMessage("MSG" + i));
session.commit();
}
Message m;
MessageConsumer consumer = session.createConsumer(destination);
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
// do prefetch num rollbacks
for (int i = 0; i < prefetchSize; i++) {
m = consumer.receive(2000);
assertNotNull("null@:" + i, m);
session.rollback();
}
// then try and consume
for (int i = 0; i < messageCount; i++) {
m = consumer.receive(2000);
assertNotNull("null@:" + i, m);
session.commit();
assertTrue("deliveryCount: " + queueControl.getDeliveringCount() + " @:" + i, queueControl.getDeliveringCount() <= prefetchSize + 1);
}
}
/**
* @throws Exception
*/