This commit is contained in:
Clebert Suconic 2018-05-21 18:07:54 -04:00
commit ef03ce4ee9
4 changed files with 106 additions and 2 deletions

View File

@ -1294,6 +1294,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
referenceIterator.remove();
ref.incrementDeliveryCount();
consumer.backToDelivering(ref);
final AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData();
amqConsumer.addRolledback(ref);
}
}
}

View File

@ -17,8 +17,11 @@
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -68,6 +71,7 @@ public class AMQConsumer {
//internal means we don't expose
//it's address/queue to management service
private boolean internalAddress = false;
private volatile Set<MessageReference> rolledbackMessageRefs;
public AMQConsumer(AMQSession amqSession,
org.apache.activemq.command.ActiveMQDestination d,
@ -85,6 +89,30 @@ public class AMQConsumer {
messagePullHandler = new MessagePullHandler();
}
this.internalAddress = internalAddress;
this.rolledbackMessageRefs = null;
}
private Set<MessageReference> guardedInitializationOfRolledBackMessageRefs() {
synchronized (this) {
Set<MessageReference> rollbackedMessageRefs = this.rolledbackMessageRefs;
if (rollbackedMessageRefs == null) {
rollbackedMessageRefs = new ConcurrentSkipListSet<>(Comparator.comparingLong(MessageReference::getMessageID));
this.rolledbackMessageRefs = rollbackedMessageRefs;
}
return rollbackedMessageRefs;
}
}
private Set<MessageReference> getRolledbackMessageRefsOrCreate() {
Set<MessageReference> rolledbackMessageRefs = this.rolledbackMessageRefs;
if (rolledbackMessageRefs == null) {
rolledbackMessageRefs = guardedInitializationOfRolledBackMessageRefs();
}
return rolledbackMessageRefs;
}
private Set<MessageReference> getRolledbackMessageRefs() {
return this.rolledbackMessageRefs;
}
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
@ -353,7 +381,6 @@ public class AMQConsumer {
}
public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
long seqId = ref.getMessage().getMessageID();
long lastDelSeqId = info.getLastDeliveredSequenceId();
//in activemq5, closing a durable subscription won't close the consumer
@ -373,6 +400,8 @@ public class AMQConsumer {
// tx cases are handled by
// org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
ref.incrementDeliveryCount();
} else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !isRolledBack(ref)) {
ref.incrementDeliveryCount();
}
return true;
@ -430,4 +459,24 @@ public class AMQConsumer {
}
}
}
public boolean removeRolledback(MessageReference messageReference) {
final Set<MessageReference> rolledbackMessageRefs = getRolledbackMessageRefs();
if (rolledbackMessageRefs == null) {
return false;
}
return rolledbackMessageRefs.remove(messageReference);
}
public void addRolledback(MessageReference messageReference) {
getRolledbackMessageRefsOrCreate().add(messageReference);
}
private boolean isRolledBack(MessageReference messageReference) {
final Set<MessageReference> rollbackedMessageRefs = getRolledbackMessageRefs();
if (rollbackedMessageRefs == null) {
return false;
}
return rollbackedMessageRefs.contains(messageReference);
}
}

View File

@ -308,7 +308,8 @@ public class AMQSession implements SessionCallback {
ServerConsumer consumer,
int deliveryCount) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
// TODO: use encoders and proper conversions here
//clear up possible rolledback ids.
theConsumer.removeRolledback(reference);
return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
}

View File

@ -21,9 +21,11 @@ import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
@ -633,4 +635,54 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
session.commit();
}
@Test
public void testClientRedlivery() throws Exception {
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
this.makeSureCoreQueueExist("TEST");
Queue queue = session.createQueue("TEST");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("test"));
} finally {
connection.close();
}
for (int i = 0; i < 10; ++i) {
connection = (ActiveMQConnection) factory.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
assertNotNull("Message null on iteration " + i, message);
System.out.println("received message: " + i);
System.out.println("is redelivered: " + message.getJMSRedelivered());
if (i > 0) {
assertTrue(message.getJMSRedelivered());
}
} finally {
connection.close();
}
}
}
}