ARTEMIS-1315 Client disconnection may cause consumer to hang

When calling a consumer to receive message with a timeout
(receive(long timeout), if the consumer's buffer is empty, it sends
a 'forced delivery' the waiting forever, expecting the server to
send back a 'forced delivery" message if the queue is empty.

If the connection is disconnected as the arrived 'forced
delivery' message is corrupted, this 'forced delivery' message
never gets to consumer. After the session is reconnected,
the consumer never knows that and stays waiting.

To fix that we can send a 'forced delivery' to server right
after the session is reconnected.
This commit is contained in:
Howard Gao 2017-08-02 09:49:54 +08:00 committed by Clebert Suconic
parent c3f8321c42
commit 613b459c52
9 changed files with 95 additions and 6 deletions

View File

@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -123,7 +124,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private boolean stopped = false;
private long forceDeliveryCount;
private AtomicLong forceDeliveryCount = new AtomicLong(0);
private final ClientSession.QueueQuery queueInfo;
@ -295,7 +296,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
logger.trace(this + "::Forcing delivery");
}
// JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
sessionContext.forceDelivery(this, forceDeliveryCount++);
sessionContext.forceDelivery(this, forceDeliveryCount.getAndIncrement());
callForceDelivery = false;
deliveryForced = true;
continue;
@ -309,7 +310,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
// Need to check if forceDelivery was called at this call
// As we could be receiving a message that came from a previous call
if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1) {
if (forcingDelivery && deliveryForced && seq == forceDeliveryCount.get() - 1) {
// forced delivery messages are discarded, nothing has been delivered by the queue
resetIfSlowConsumer();
@ -538,6 +539,11 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
return queueInfo;
}
@Override
public long getForceDeliveryCount() {
return forceDeliveryCount.get();
}
@Override
public SimpleString getFilterString() {
return filterString;

View File

@ -68,4 +68,6 @@ public interface ClientConsumerInternal extends ClientConsumer {
void start();
ClientSession.QueueQuery getQueueInfo();
long getForceDeliveryCount();
}

View File

@ -1242,7 +1242,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
ClientConsumerInternal consumerInternal = entryx.getValue();
sessionContext.recreateConsumerOnServer(consumerInternal);
sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started);
}
if ((!autoCommitAcks || !autoCommitSends) && workDone) {

View File

@ -26,6 +26,7 @@ public class ActiveMQConsumerContext extends ConsumerContext {
this.id = id;
}
@Override
public long getId() {
return id;
}

View File

@ -690,7 +690,7 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException {
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException {
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
// We try and recreate any non durable queues, since they probably won't be there unless
@ -717,6 +717,19 @@ public class ActiveMQSessionContext extends SessionContext {
SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), 1);
sendPacketWithoutLock(sessionChannel, packet);
}
//force a delivery to avoid a infinite waiting
//it can happen when the consumer sends a 'forced delivery' then
//waiting forever, while the connection is broken and the server's
//'forced delivery' message never gets to consumer. If session
//is reconnected, its consumer never knows and stays waiting.
//note this message will either be ignored by consumer (forceDeliveryCount
//doesn't match, which is fine) or be caught by consumer
//(in which case the consumer will wake up, thus avoid the infinite waiting).
if (isSessionStarted && consumerInternal.getForceDeliveryCount() > 0) {
SessionForceConsumerDelivery forceDel = new SessionForceConsumerDelivery(consumerId, consumerInternal.getForceDeliveryCount() - 1);
sendPacketWithoutLock(sessionChannel, forceDel);
}
}
@Override

View File

@ -18,4 +18,5 @@ package org.apache.activemq.artemis.spi.core.remoting;
public abstract class ConsumerContext {
public abstract long getId();
}

View File

@ -286,7 +286,7 @@ public abstract class SessionContext {
boolean autoCommitAcks,
boolean preAcknowledge) throws ActiveMQException;
public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException;
public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException;
public abstract void xaFailed(Xid xid) throws ActiveMQException;

View File

@ -18,7 +18,9 @@
package org.apache.activemq.artemis.tests.extras.byteman;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
@ -30,6 +32,7 @@ import org.junit.runner.RunWith;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@ -127,6 +130,64 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase {
}
}
@Test(timeout = 60000)
@BMRules(
rules = {@BMRule(
name = "Corrupt Decoding",
targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder",
targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")})
public void testClientDisconnectLarge() throws Exception {
Queue q1 = createQueue("queue1");
final Connection connection = nettyCf.createConnection();
final CountDownLatch latch = new CountDownLatch(1);
ServerLocator locator = ((ActiveMQConnectionFactory)nettyCf).getServerLocator();
int minSize = locator.getMinLargeMessageSize();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < minSize; i++) {
builder.append("a");
}
try {
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException e) {
latch.countDown();
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(q1);
TextMessage m = session.createTextMessage(builder.toString());
producer.send(m);
connection.start();
corruptPacket.set(true);
MessageConsumer consumer = session.createConsumer(q1);
Message lm = consumer.receive(2000);
//first receive won't crash because the packet
//is SESS_RECEIVE_LARGE_MSG
assertNotNull(lm);
//second receive will force server to send a
//"forced delivery" message, and will cause
//the exception to be thrown.
lm = consumer.receive(5000);
assertNull(lm);
assertTrue(latch.await(5, TimeUnit.SECONDS));
} finally {
corruptPacket.set(false);
if (connection != null) {
connection.close();
}
}
}
public static void doThrow(ActiveMQBuffer buff) {
byte type = buff.getByte(buff.readerIndex());
if (corruptPacket.get() && type == PacketImpl.SESS_RECEIVE_MSG) {

View File

@ -786,6 +786,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
return null;
}
@Override
public long getForceDeliveryCount() {
return 0;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal#getNonXAsession()
*/