This commit is contained in:
Clebert Suconic 2017-08-08 10:07:15 -04:00
commit 6f3370b9fe
11 changed files with 137 additions and 11 deletions
artemis-core-client/src/main/java/org/apache/activemq/artemis
api/core/client
core
spi/core/remoting
artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client
tests
integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer
jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests

View File

@ -894,12 +894,20 @@ public interface ClientSession extends XAResource, AutoCloseable {
boolean isXA();
/**
* Commits the current transaction.
* Commits the current transaction, blocking.
*
* @throws ActiveMQException if an exception occurs while committing the transaction
*/
void commit() throws ActiveMQException;
/**
* Commits the current transaction.
*
* @param block if the commit will be blocking or not.
* @throws ActiveMQException if an exception occurs while committing the transaction
*/
void commit(boolean block) throws ActiveMQException;
/**
* Rolls back the current transaction.
*

View File

@ -761,6 +761,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
@Override
public void commit() throws ActiveMQException {
commit(true);
}
@Override
public void commit(boolean block) throws ActiveMQException {
checkClosed();
if (logger.isTraceEnabled()) {
@ -782,8 +787,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
if (rollbackOnly) {
rollbackOnFailover(true);
}
startCall();
try {
sessionContext.simpleCommit();
sessionContext.simpleCommit(block);
} catch (ActiveMQException e) {
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || rollbackOnly) {
// The call to commit was unlocked on failover, we therefore rollback the tx,
@ -794,6 +800,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} else {
throw e;
}
} finally {
endCall();
}
//oops, we have failed over during the commit and don't know what happened

View File

@ -348,6 +348,15 @@ public class ActiveMQSessionContext extends SessionContext {
sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
}
@Override
public void simpleCommit(boolean block) throws ActiveMQException {
if (block) {
sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
} else {
sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT));
}
}
@Override
public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException {
sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE);

View File

@ -214,6 +214,9 @@ public abstract class SessionContext {
public abstract void simpleCommit() throws ActiveMQException;
public abstract void simpleCommit(boolean block) throws ActiveMQException;
/**
* If we are doing a simple rollback on the RA, we need to ack the last message sent to the consumer,
* otherwise DLQ won't work.

View File

@ -597,7 +597,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
try {
ClientSession session;
boolean isBlockOnAcknowledge = sessionFactory.getServerLocator().isBlockOnAcknowledge();
int ackBatchSize = sessionFactory.getServerLocator().getAckBatchSize();
if (acknowledgeMode == Session.SESSION_TRANSACTED) {
session = sessionFactory.createSession(username, password, isXA, false, false, sessionFactory.getServerLocator().isPreAcknowledge(), transactionBatchSize);
} else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) {
@ -605,9 +606,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
} else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
session = sessionFactory.createSession(username, password, isXA, true, true, sessionFactory.getServerLocator().isPreAcknowledge(), dupsOKBatchSize);
} else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
session = sessionFactory.createSession(username, password, isXA, true, false, sessionFactory.getServerLocator().isPreAcknowledge(), transactionBatchSize);
session = sessionFactory.createSession(username, password, isXA, true, false, sessionFactory.getServerLocator().isPreAcknowledge(), isBlockOnAcknowledge ? transactionBatchSize : ackBatchSize);
} else if (acknowledgeMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {
session = sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize);
session = sessionFactory.createSession(username, password, isXA, true, false, false, isBlockOnAcknowledge ? transactionBatchSize : ackBatchSize);
} else if (acknowledgeMode == ActiveMQJMSConstants.PRE_ACKNOWLEDGE) {
session = sessionFactory.createSession(username, password, isXA, true, false, true, transactionBatchSize);
} else {

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.reader.MessageUtil;
@ -200,6 +201,8 @@ public class ActiveMQMessage implements javax.jms.Message {
private boolean individualAck;
private boolean clientAck;
private long jmsDeliveryTime;
// Constructors --------------------------------------------------
@ -710,11 +713,15 @@ public class ActiveMQMessage implements javax.jms.Message {
public void acknowledge() throws JMSException {
if (session != null) {
try {
if (session.isClosed()) {
throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();
}
if (individualAck) {
message.individualAcknowledge();
}
session.commit();
if (clientAck || individualAck) {
session.commit(session.isBlockOnAcknowledge());
}
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
@ -777,6 +784,10 @@ public class ActiveMQMessage implements javax.jms.Message {
this.individualAck = true;
}
public void setClientAcknowledge() {
this.clientAck = true;
}
public void resetMessageID(final String newMsgID) {
this.msgID = newMsgID;
}

View File

@ -237,6 +237,9 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
// https://issues.jboss.org/browse/JBPAPP-6110
if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {
jmsMsg.setIndividualAcknowledge();
} else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
jmsMsg.setClientAcknowledge();
coreMessage.acknowledge();
} else {
coreMessage.acknowledge();
}

View File

@ -41,6 +41,8 @@ public class JMSMessageListenerWrapper implements MessageHandler {
private final boolean individualACK;
private final boolean clientACK;
protected JMSMessageListenerWrapper(final ConnectionFactoryOptions options,
final ActiveMQConnection connection,
final ActiveMQSession session,
@ -60,6 +62,8 @@ public class JMSMessageListenerWrapper implements MessageHandler {
transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();
individualACK = (ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
clientACK = (ackMode == Session.CLIENT_ACKNOWLEDGE);
}
/**
@ -74,11 +78,14 @@ public class JMSMessageListenerWrapper implements MessageHandler {
msg.setIndividualAcknowledge();
}
if (clientACK) {
msg.setClientAcknowledge();
}
try {
msg.doBeforeReceive();
} catch (Exception e) {
ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e);
return;
}

View File

@ -179,9 +179,10 @@ public class JmsConsumerTest extends JMSTestBase {
}
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
conn.close();
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
conn.close();
}
@Test
@ -225,9 +226,10 @@ public class JmsConsumerTest extends JMSTestBase {
}
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
context.close();
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
context.close();
}
@Test
@ -299,9 +301,10 @@ public class JmsConsumerTest extends JMSTestBase {
}
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
conn.close();
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
conn.close();
}
@Test

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.jms.tests;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@ -31,6 +33,9 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
import org.junit.Assert;
import org.junit.Test;
@ -1297,4 +1302,68 @@ public class AcknowledgementTest extends JMSTestCase {
checkEmpty(queue1);
}
/**
* Ensure no blocking calls in acknowledge flow when block on acknowledge = false.
* This is done by checking the performance compared to blocking is much improved.
*/
@Test
public void testNonBlockingAckPerf() throws Exception {
getJmsServerManager().createConnectionFactory("testsuitecf1", false, JMSFactoryType.CF, NETTY_CONNECTOR, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/testsuitecf1");
getJmsServerManager().createConnectionFactory("testsuitecf2", false, JMSFactoryType.CF, NETTY_CONNECTOR, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/testsuitecf2");
ActiveMQJMSConnectionFactory cf1 = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf1");
cf1.setBlockOnAcknowledge(false);
ActiveMQJMSConnectionFactory cf2 = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf2");
cf2.setBlockOnAcknowledge(true);
int messageCount = 10000;
long sendT1 = send(cf1, queue1, messageCount);
long sendT2 = send(cf2, queue2, messageCount);
long time1 = consume(cf1, queue1, messageCount);
long time2 = consume(cf2, queue2, messageCount);
log.info("BlockOnAcknowledge=false MessageCount=" + messageCount + " TimeToConsume=" + time1);
log.info("BlockOnAcknowledge=true MessageCount=" + messageCount + " TimeToConsume=" + time2);
Assert.assertTrue(time1 < (time2 / 2));
}
private long send(ConnectionFactory connectionFactory, Destination destination, int messageCount) throws JMSException {
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
try (Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE)) {
MessageProducer producer = session.createProducer(destination);
Message m = session.createTextMessage("testing123");
long start = System.nanoTime();
for (int i = 0; i < messageCount; i++) {
producer.send(m);
}
session.commit();
long end = System.nanoTime();
return end - start;
}
}
}
private long consume(ConnectionFactory connectionFactory, Destination destination, int messageCount) throws JMSException {
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
MessageConsumer consumer = session.createConsumer(destination);
long start = System.nanoTime();
for (int i = 0; i < messageCount; i++) {
Message message = consumer.receive(100);
if (message != null) {
message.acknowledge();
}
}
long end = System.nanoTime();
return end - start;
}
}
}
}

View File

@ -1275,6 +1275,10 @@ public class MessageHeaderTest extends MessageHeaderTestBase {
public void commit() throws ActiveMQException {
}
@Override
public void commit(boolean block) throws ActiveMQException {
}
@Override
public boolean isRollbackOnly() {