This commit is contained in:
Justin Bertram 2022-02-17 14:36:39 -06:00
commit 7cc66a8299
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
3 changed files with 82 additions and 1 deletions

View File

@ -113,7 +113,7 @@ public class AMQConsumer {
return rolledbackMessageRefs;
}
private Set<MessageReference> getRolledbackMessageRefs() {
protected Set<MessageReference> getRolledbackMessageRefs() {
return this.rolledbackMessageRefs;
}
@ -343,6 +343,7 @@ public class AMQConsumer {
if (ack.isIndividualAck() || ack.isStandardAck()) {
for (MessageReference ref : ackList) {
ref.acknowledge(transaction, serverConsumer);
removeRolledback(ref);
}
} else if (ack.isPoisonAck()) {
for (MessageReference ref : ackList) {
@ -353,6 +354,7 @@ public class AMQConsumer {
((QueueImpl) ref.getQueue()).incDelivering(ref);
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
removeRolledback(ref);
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import org.apache.activemq.artemis.core.server.MessageReference;
import java.util.Set;
public class AMQConsumerAccessor {
public static Set<MessageReference> getRolledbackMessageRefs(AMQConsumer amqConsumer) {
return amqConsumer.getRolledbackMessageRefs();
}
}

View File

@ -29,6 +29,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerAccessor;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
@ -479,6 +483,53 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
}
/**
* @throws Exception
*/
@Test
public void testRedeliveryRefCleanup() throws Exception {
// Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setUseExponentialBackOff(false);
policy.setMaximumRedeliveries(-1);
policy.setRedeliveryDelay(50);
connection.start();
Session pSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Session cSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
this.makeSureCoreQueueExist("TEST");
MessageProducer producer = pSession.createProducer(destination);
MessageConsumer consumer = cSession.createConsumer(destination);
TextMessage m;
for (int i = 0; i < 5; ++i) {
producer.send(pSession.createTextMessage("MessageText"));
pSession.commit();
m = (TextMessage) consumer.receive(2000);
assertNotNull(m);
cSession.rollback();
m = (TextMessage) consumer.receive(2000);
assertNotNull(m);
cSession.commit();
}
ServerConsumer serverConsumer = null;
for (ServerSession session : server.getSessions()) {
for (ServerConsumer sessionConsumer : session.getServerConsumers()) {
if (sessionConsumer.getQueue().getName().toString() == "TEST") {
serverConsumer = sessionConsumer;
}
}
}
AMQConsumer amqConsumer = (AMQConsumer) serverConsumer.getProtocolData();
assertTrue(AMQConsumerAccessor.getRolledbackMessageRefs(amqConsumer).isEmpty());
}
@Test
public void testInitialRedeliveryDelayZero() throws Exception {
// Receive a message with the JMS API