From 613b459c521427058bc1a3af8155c695fcb0df9f Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Wed, 2 Aug 2017 09:49:54 +0800 Subject: [PATCH] ARTEMIS-1315 Client disconnection may cause consumer to hang When calling a consumer to receive message with a timeout (receive(long timeout), if the consumer's buffer is empty, it sends a 'forced delivery' the waiting forever, expecting the server to send back a 'forced delivery" message if the queue is empty. If the connection is disconnected as the arrived 'forced delivery' message is corrupted, this 'forced delivery' message never gets to consumer. After the session is reconnected, the consumer never knows that and stays waiting. To fix that we can send a 'forced delivery' to server right after the session is reconnected. --- .../core/client/impl/ClientConsumerImpl.java | 12 +++- .../client/impl/ClientConsumerInternal.java | 2 + .../core/client/impl/ClientSessionImpl.java | 2 +- .../core/impl/ActiveMQConsumerContext.java | 1 + .../core/impl/ActiveMQSessionContext.java | 15 ++++- .../spi/core/remoting/ConsumerContext.java | 1 + .../spi/core/remoting/SessionContext.java | 2 +- .../DisconnectOnCriticalFailureTest.java | 61 +++++++++++++++++++ .../client/impl/LargeMessageBufferTest.java | 5 ++ 9 files changed, 95 insertions(+), 6 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index 4b48caac43..ccaa004064 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -123,7 +124,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { private boolean stopped = false; - private long forceDeliveryCount; + private AtomicLong forceDeliveryCount = new AtomicLong(0); private final ClientSession.QueueQuery queueInfo; @@ -295,7 +296,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { logger.trace(this + "::Forcing delivery"); } // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks - sessionContext.forceDelivery(this, forceDeliveryCount++); + sessionContext.forceDelivery(this, forceDeliveryCount.getAndIncrement()); callForceDelivery = false; deliveryForced = true; continue; @@ -309,7 +310,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { // Need to check if forceDelivery was called at this call // As we could be receiving a message that came from a previous call - if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1) { + if (forcingDelivery && deliveryForced && seq == forceDeliveryCount.get() - 1) { // forced delivery messages are discarded, nothing has been delivered by the queue resetIfSlowConsumer(); @@ -538,6 +539,11 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { return queueInfo; } + @Override + public long getForceDeliveryCount() { + return forceDeliveryCount.get(); + } + @Override public SimpleString getFilterString() { return filterString; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java index cc5f3f1f17..177732e24d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java @@ -68,4 +68,6 @@ public interface ClientConsumerInternal extends ClientConsumer { void start(); ClientSession.QueueQuery getQueueInfo(); + + long getForceDeliveryCount(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index f4b80cd0ed..5f6b40bf22 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1242,7 +1242,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi ClientConsumerInternal consumerInternal = entryx.getValue(); - sessionContext.recreateConsumerOnServer(consumerInternal); + sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started); } if ((!autoCommitAcks || !autoCommitSends) && workDone) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java index 65540ee5b2..5af0e5ea8b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java @@ -26,6 +26,7 @@ public class ActiveMQConsumerContext extends ConsumerContext { this.id = id; } + @Override public long getId() { return id; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index b8eb22c9fe..fc436725d8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -690,7 +690,7 @@ public class ActiveMQSessionContext extends SessionContext { } @Override - public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException { + public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException { ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo(); // We try and recreate any non durable queues, since they probably won't be there unless @@ -717,6 +717,19 @@ public class ActiveMQSessionContext extends SessionContext { SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), 1); sendPacketWithoutLock(sessionChannel, packet); } + + //force a delivery to avoid a infinite waiting + //it can happen when the consumer sends a 'forced delivery' then + //waiting forever, while the connection is broken and the server's + //'forced delivery' message never gets to consumer. If session + //is reconnected, its consumer never knows and stays waiting. + //note this message will either be ignored by consumer (forceDeliveryCount + //doesn't match, which is fine) or be caught by consumer + //(in which case the consumer will wake up, thus avoid the infinite waiting). + if (isSessionStarted && consumerInternal.getForceDeliveryCount() > 0) { + SessionForceConsumerDelivery forceDel = new SessionForceConsumerDelivery(consumerId, consumerInternal.getForceDeliveryCount() - 1); + sendPacketWithoutLock(sessionChannel, forceDel); + } } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConsumerContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConsumerContext.java index f2c70cf192..cd50cc5bca 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConsumerContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConsumerContext.java @@ -18,4 +18,5 @@ package org.apache.activemq.artemis.spi.core.remoting; public abstract class ConsumerContext { + public abstract long getId(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index fe3feb2de1..b1239605ef 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -286,7 +286,7 @@ public abstract class SessionContext { boolean autoCommitAcks, boolean preAcknowledge) throws ActiveMQException; - public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException; + public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException; public abstract void xaFailed(Xid xid) throws ActiveMQException; diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java index fea3bf45f8..2be7d67269 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java @@ -18,7 +18,9 @@ package org.apache.activemq.artemis.tests.extras.byteman; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRules; @@ -30,6 +32,7 @@ import org.junit.runner.RunWith; import javax.jms.Connection; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -127,6 +130,64 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase { } } + @Test(timeout = 60000) + @BMRules( + rules = {@BMRule( + name = "Corrupt Decoding", + targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder", + targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")}) + public void testClientDisconnectLarge() throws Exception { + Queue q1 = createQueue("queue1"); + final Connection connection = nettyCf.createConnection(); + final CountDownLatch latch = new CountDownLatch(1); + ServerLocator locator = ((ActiveMQConnectionFactory)nettyCf).getServerLocator(); + int minSize = locator.getMinLargeMessageSize(); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < minSize; i++) { + builder.append("a"); + } + + try { + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException e) { + latch.countDown(); + } + }); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(q1); + TextMessage m = session.createTextMessage(builder.toString()); + producer.send(m); + connection.start(); + + corruptPacket.set(true); + MessageConsumer consumer = session.createConsumer(q1); + Message lm = consumer.receive(2000); + + //first receive won't crash because the packet + //is SESS_RECEIVE_LARGE_MSG + assertNotNull(lm); + + //second receive will force server to send a + //"forced delivery" message, and will cause + //the exception to be thrown. + lm = consumer.receive(5000); + assertNull(lm); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } finally { + corruptPacket.set(false); + + if (connection != null) { + connection.close(); + } + } + } + public static void doThrow(ActiveMQBuffer buff) { byte type = buff.getByte(buff.readerIndex()); if (corruptPacket.get() && type == PacketImpl.SESS_RECEIVE_MSG) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java index a0bc01ca65..d78f070174 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java @@ -786,6 +786,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase { return null; } + @Override + public long getForceDeliveryCount() { + return 0; + } + /* (non-Javadoc) * @see org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal#getNonXAsession() */