This closes #1434
This commit is contained in:
commit
42dbd02bda
|
@ -23,6 +23,7 @@ import java.util.Iterator;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
@ -123,7 +124,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
||||||
|
|
||||||
private boolean stopped = false;
|
private boolean stopped = false;
|
||||||
|
|
||||||
private long forceDeliveryCount;
|
private AtomicLong forceDeliveryCount = new AtomicLong(0);
|
||||||
|
|
||||||
private final ClientSession.QueueQuery queueInfo;
|
private final ClientSession.QueueQuery queueInfo;
|
||||||
|
|
||||||
|
@ -295,7 +296,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
||||||
logger.trace(this + "::Forcing delivery");
|
logger.trace(this + "::Forcing delivery");
|
||||||
}
|
}
|
||||||
// JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
|
// JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
|
||||||
sessionContext.forceDelivery(this, forceDeliveryCount++);
|
sessionContext.forceDelivery(this, forceDeliveryCount.getAndIncrement());
|
||||||
callForceDelivery = false;
|
callForceDelivery = false;
|
||||||
deliveryForced = true;
|
deliveryForced = true;
|
||||||
continue;
|
continue;
|
||||||
|
@ -309,7 +310,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
||||||
|
|
||||||
// Need to check if forceDelivery was called at this call
|
// Need to check if forceDelivery was called at this call
|
||||||
// As we could be receiving a message that came from a previous call
|
// As we could be receiving a message that came from a previous call
|
||||||
if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1) {
|
if (forcingDelivery && deliveryForced && seq == forceDeliveryCount.get() - 1) {
|
||||||
// forced delivery messages are discarded, nothing has been delivered by the queue
|
// forced delivery messages are discarded, nothing has been delivered by the queue
|
||||||
resetIfSlowConsumer();
|
resetIfSlowConsumer();
|
||||||
|
|
||||||
|
@ -538,6 +539,11 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
||||||
return queueInfo;
|
return queueInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getForceDeliveryCount() {
|
||||||
|
return forceDeliveryCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SimpleString getFilterString() {
|
public SimpleString getFilterString() {
|
||||||
return filterString;
|
return filterString;
|
||||||
|
|
|
@ -68,4 +68,6 @@ public interface ClientConsumerInternal extends ClientConsumer {
|
||||||
void start();
|
void start();
|
||||||
|
|
||||||
ClientSession.QueueQuery getQueueInfo();
|
ClientSession.QueueQuery getQueueInfo();
|
||||||
|
|
||||||
|
long getForceDeliveryCount();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1242,7 +1242,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
|
|
||||||
ClientConsumerInternal consumerInternal = entryx.getValue();
|
ClientConsumerInternal consumerInternal = entryx.getValue();
|
||||||
|
|
||||||
sessionContext.recreateConsumerOnServer(consumerInternal);
|
sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((!autoCommitAcks || !autoCommitSends) && workDone) {
|
if ((!autoCommitAcks || !autoCommitSends) && workDone) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ public class ActiveMQConsumerContext extends ConsumerContext {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public long getId() {
|
public long getId() {
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
|
@ -690,7 +690,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException {
|
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException {
|
||||||
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
|
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
|
||||||
|
|
||||||
// We try and recreate any non durable queues, since they probably won't be there unless
|
// We try and recreate any non durable queues, since they probably won't be there unless
|
||||||
|
@ -717,6 +717,19 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), 1);
|
SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), 1);
|
||||||
sendPacketWithoutLock(sessionChannel, packet);
|
sendPacketWithoutLock(sessionChannel, packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//force a delivery to avoid a infinite waiting
|
||||||
|
//it can happen when the consumer sends a 'forced delivery' then
|
||||||
|
//waiting forever, while the connection is broken and the server's
|
||||||
|
//'forced delivery' message never gets to consumer. If session
|
||||||
|
//is reconnected, its consumer never knows and stays waiting.
|
||||||
|
//note this message will either be ignored by consumer (forceDeliveryCount
|
||||||
|
//doesn't match, which is fine) or be caught by consumer
|
||||||
|
//(in which case the consumer will wake up, thus avoid the infinite waiting).
|
||||||
|
if (isSessionStarted && consumerInternal.getForceDeliveryCount() > 0) {
|
||||||
|
SessionForceConsumerDelivery forceDel = new SessionForceConsumerDelivery(consumerId, consumerInternal.getForceDeliveryCount() - 1);
|
||||||
|
sendPacketWithoutLock(sessionChannel, forceDel);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,4 +18,5 @@ package org.apache.activemq.artemis.spi.core.remoting;
|
||||||
|
|
||||||
public abstract class ConsumerContext {
|
public abstract class ConsumerContext {
|
||||||
|
|
||||||
|
public abstract long getId();
|
||||||
}
|
}
|
||||||
|
|
|
@ -286,7 +286,7 @@ public abstract class SessionContext {
|
||||||
boolean autoCommitAcks,
|
boolean autoCommitAcks,
|
||||||
boolean preAcknowledge) throws ActiveMQException;
|
boolean preAcknowledge) throws ActiveMQException;
|
||||||
|
|
||||||
public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException;
|
public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException;
|
||||||
|
|
||||||
public abstract void xaFailed(Xid xid) throws ActiveMQException;
|
public abstract void xaFailed(Xid xid) throws ActiveMQException;
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,9 @@
|
||||||
package org.apache.activemq.artemis.tests.extras.byteman;
|
package org.apache.activemq.artemis.tests.extras.byteman;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||||
import org.jboss.byteman.contrib.bmunit.BMRule;
|
import org.jboss.byteman.contrib.bmunit.BMRule;
|
||||||
import org.jboss.byteman.contrib.bmunit.BMRules;
|
import org.jboss.byteman.contrib.bmunit.BMRules;
|
||||||
|
@ -30,6 +32,7 @@ import org.junit.runner.RunWith;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ExceptionListener;
|
import javax.jms.ExceptionListener;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
|
@ -127,6 +130,64 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
@BMRules(
|
||||||
|
rules = {@BMRule(
|
||||||
|
name = "Corrupt Decoding",
|
||||||
|
targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder",
|
||||||
|
targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)",
|
||||||
|
targetLocation = "ENTRY",
|
||||||
|
action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")})
|
||||||
|
public void testClientDisconnectLarge() throws Exception {
|
||||||
|
Queue q1 = createQueue("queue1");
|
||||||
|
final Connection connection = nettyCf.createConnection();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
ServerLocator locator = ((ActiveMQConnectionFactory)nettyCf).getServerLocator();
|
||||||
|
int minSize = locator.getMinLargeMessageSize();
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
for (int i = 0; i < minSize; i++) {
|
||||||
|
builder.append("a");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection.setExceptionListener(new ExceptionListener() {
|
||||||
|
@Override
|
||||||
|
public void onException(JMSException e) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(q1);
|
||||||
|
TextMessage m = session.createTextMessage(builder.toString());
|
||||||
|
producer.send(m);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
corruptPacket.set(true);
|
||||||
|
MessageConsumer consumer = session.createConsumer(q1);
|
||||||
|
Message lm = consumer.receive(2000);
|
||||||
|
|
||||||
|
//first receive won't crash because the packet
|
||||||
|
//is SESS_RECEIVE_LARGE_MSG
|
||||||
|
assertNotNull(lm);
|
||||||
|
|
||||||
|
//second receive will force server to send a
|
||||||
|
//"forced delivery" message, and will cause
|
||||||
|
//the exception to be thrown.
|
||||||
|
lm = consumer.receive(5000);
|
||||||
|
assertNull(lm);
|
||||||
|
|
||||||
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
|
} finally {
|
||||||
|
corruptPacket.set(false);
|
||||||
|
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void doThrow(ActiveMQBuffer buff) {
|
public static void doThrow(ActiveMQBuffer buff) {
|
||||||
byte type = buff.getByte(buff.readerIndex());
|
byte type = buff.getByte(buff.readerIndex());
|
||||||
if (corruptPacket.get() && type == PacketImpl.SESS_RECEIVE_MSG) {
|
if (corruptPacket.get() && type == PacketImpl.SESS_RECEIVE_MSG) {
|
||||||
|
|
|
@ -786,6 +786,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getForceDeliveryCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
/* (non-Javadoc)
|
||||||
* @see org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal#getNonXAsession()
|
* @see org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal#getNonXAsession()
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue