diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index c3d6749105..ab59eb6651 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -894,12 +894,20 @@ public interface ClientSession extends XAResource, AutoCloseable { boolean isXA(); /** - * Commits the current transaction. + * Commits the current transaction, blocking. * * @throws ActiveMQException if an exception occurs while committing the transaction */ void commit() throws ActiveMQException; + /** + * Commits the current transaction. + * + * @param block if the commit will be blocking or not. + * @throws ActiveMQException if an exception occurs while committing the transaction + */ + void commit(boolean block) throws ActiveMQException; + /** * Rolls back the current transaction. * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 5f6b40bf22..ef4e87cff2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -761,6 +761,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi @Override public void commit() throws ActiveMQException { + commit(true); + } + + @Override + public void commit(boolean block) throws ActiveMQException { checkClosed(); if (logger.isTraceEnabled()) { @@ -782,8 +787,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi if (rollbackOnly) { rollbackOnFailover(true); } + startCall(); try { - sessionContext.simpleCommit(); + sessionContext.simpleCommit(block); } catch (ActiveMQException e) { if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || rollbackOnly) { // The call to commit was unlocked on failover, we therefore rollback the tx, @@ -794,6 +800,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } else { throw e; } + } finally { + endCall(); } //oops, we have failed over during the commit and don't know what happened diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index fc436725d8..d0d75ac346 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -348,6 +348,15 @@ public class ActiveMQSessionContext extends SessionContext { sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE); } + @Override + public void simpleCommit(boolean block) throws ActiveMQException { + if (block) { + sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE); + } else { + sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT)); + } + } + @Override public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException { sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index b1239605ef..78135a8bec 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -214,6 +214,9 @@ public abstract class SessionContext { public abstract void simpleCommit() throws ActiveMQException; + public abstract void simpleCommit(boolean block) throws ActiveMQException; + + /** * If we are doing a simple rollback on the RA, we need to ack the last message sent to the consumer, * otherwise DLQ won't work. diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java index 6432af2774..bf0d236c8b 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java @@ -597,7 +597,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme try { ClientSession session; - + boolean isBlockOnAcknowledge = sessionFactory.getServerLocator().isBlockOnAcknowledge(); + int ackBatchSize = sessionFactory.getServerLocator().getAckBatchSize(); if (acknowledgeMode == Session.SESSION_TRANSACTED) { session = sessionFactory.createSession(username, password, isXA, false, false, sessionFactory.getServerLocator().isPreAcknowledge(), transactionBatchSize); } else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) { @@ -605,9 +606,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme } else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) { session = sessionFactory.createSession(username, password, isXA, true, true, sessionFactory.getServerLocator().isPreAcknowledge(), dupsOKBatchSize); } else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) { - session = sessionFactory.createSession(username, password, isXA, true, false, sessionFactory.getServerLocator().isPreAcknowledge(), transactionBatchSize); + session = sessionFactory.createSession(username, password, isXA, true, false, sessionFactory.getServerLocator().isPreAcknowledge(), isBlockOnAcknowledge ? transactionBatchSize : ackBatchSize); } else if (acknowledgeMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) { - session = sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize); + session = sessionFactory.createSession(username, password, isXA, true, false, false, isBlockOnAcknowledge ? transactionBatchSize : ackBatchSize); } else if (acknowledgeMode == ActiveMQJMSConstants.PRE_ACKNOWLEDGE) { session = sessionFactory.createSession(username, password, isXA, true, false, true, transactionBatchSize); } else { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index f13f602ef4..928d375723 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.reader.MessageUtil; @@ -200,6 +201,8 @@ public class ActiveMQMessage implements javax.jms.Message { private boolean individualAck; + private boolean clientAck; + private long jmsDeliveryTime; // Constructors -------------------------------------------------- @@ -710,11 +713,15 @@ public class ActiveMQMessage implements javax.jms.Message { public void acknowledge() throws JMSException { if (session != null) { try { + if (session.isClosed()) { + throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed(); + } if (individualAck) { message.individualAcknowledge(); } - - session.commit(); + if (clientAck || individualAck) { + session.commit(session.isBlockOnAcknowledge()); + } } catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } @@ -777,6 +784,10 @@ public class ActiveMQMessage implements javax.jms.Message { this.individualAck = true; } + public void setClientAcknowledge() { + this.clientAck = true; + } + public void resetMessageID(final String newMsgID) { this.msgID = newMsgID; } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java index 3d7fa56fc3..4664bb9f0c 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java @@ -237,6 +237,9 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr // https://issues.jboss.org/browse/JBPAPP-6110 if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) { jmsMsg.setIndividualAcknowledge(); + } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { + jmsMsg.setClientAcknowledge(); + coreMessage.acknowledge(); } else { coreMessage.acknowledge(); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java index 92ae226e6a..5d9f6ed259 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java @@ -41,6 +41,8 @@ public class JMSMessageListenerWrapper implements MessageHandler { private final boolean individualACK; + private final boolean clientACK; + protected JMSMessageListenerWrapper(final ConnectionFactoryOptions options, final ActiveMQConnection connection, final ActiveMQSession session, @@ -60,6 +62,8 @@ public class JMSMessageListenerWrapper implements MessageHandler { transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA(); individualACK = (ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE); + + clientACK = (ackMode == Session.CLIENT_ACKNOWLEDGE); } /** @@ -74,11 +78,14 @@ public class JMSMessageListenerWrapper implements MessageHandler { msg.setIndividualAcknowledge(); } + if (clientACK) { + msg.setClientAcknowledge(); + } + try { msg.doBeforeReceive(); } catch (Exception e) { ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e); - return; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java index d242da88b7..5cefbd0b8b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java @@ -179,9 +179,10 @@ public class JmsConsumerTest extends JMSTestBase { } SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME); + conn.close(); + Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount()); Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable())); - conn.close(); } @Test @@ -225,9 +226,10 @@ public class JmsConsumerTest extends JMSTestBase { } SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME); + context.close(); + Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount()); Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable())); - context.close(); } @Test @@ -299,9 +301,10 @@ public class JmsConsumerTest extends JMSTestBase { } SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME); + conn.close(); + Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount()); Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable())); - conn.close(); } @Test diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java index 17927b102d..a9ede4b8df 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java @@ -19,6 +19,8 @@ package org.apache.activemq.artemis.jms.tests; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -31,6 +33,9 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import java.util.concurrent.CountDownLatch; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.jms.JMSFactoryType; +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; import org.junit.Assert; import org.junit.Test; @@ -1297,4 +1302,68 @@ public class AcknowledgementTest extends JMSTestCase { checkEmpty(queue1); } + + /** + * Ensure no blocking calls in acknowledge flow when block on acknowledge = false. + * This is done by checking the performance compared to blocking is much improved. + */ + @Test + public void testNonBlockingAckPerf() throws Exception { + getJmsServerManager().createConnectionFactory("testsuitecf1", false, JMSFactoryType.CF, NETTY_CONNECTOR, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/testsuitecf1"); + getJmsServerManager().createConnectionFactory("testsuitecf2", false, JMSFactoryType.CF, NETTY_CONNECTOR, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/testsuitecf2"); + + ActiveMQJMSConnectionFactory cf1 = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf1"); + cf1.setBlockOnAcknowledge(false); + ActiveMQJMSConnectionFactory cf2 = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf2"); + cf2.setBlockOnAcknowledge(true); + + int messageCount = 10000; + + long sendT1 = send(cf1, queue1, messageCount); + long sendT2 = send(cf2, queue2, messageCount); + + long time1 = consume(cf1, queue1, messageCount); + long time2 = consume(cf2, queue2, messageCount); + + log.info("BlockOnAcknowledge=false MessageCount=" + messageCount + " TimeToConsume=" + time1); + log.info("BlockOnAcknowledge=true MessageCount=" + messageCount + " TimeToConsume=" + time2); + + Assert.assertTrue(time1 < (time2 / 2)); + + } + + private long send(ConnectionFactory connectionFactory, Destination destination, int messageCount) throws JMSException { + try (Connection connection = connectionFactory.createConnection()) { + connection.start(); + try (Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE)) { + MessageProducer producer = session.createProducer(destination); + Message m = session.createTextMessage("testing123"); + long start = System.nanoTime(); + for (int i = 0; i < messageCount; i++) { + producer.send(m); + } + session.commit(); + long end = System.nanoTime(); + return end - start; + } + } + } + + private long consume(ConnectionFactory connectionFactory, Destination destination, int messageCount) throws JMSException { + try (Connection connection = connectionFactory.createConnection()) { + connection.start(); + try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) { + MessageConsumer consumer = session.createConsumer(destination); + long start = System.nanoTime(); + for (int i = 0; i < messageCount; i++) { + Message message = consumer.receive(100); + if (message != null) { + message.acknowledge(); + } + } + long end = System.nanoTime(); + return end - start; + } + } + } } diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java index efea04552e..39ea0e36d5 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java @@ -1275,6 +1275,10 @@ public class MessageHeaderTest extends MessageHeaderTestBase { public void commit() throws ActiveMQException { } + @Override + public void commit(boolean block) throws ActiveMQException { + } + @Override public boolean isRollbackOnly() {