ARTEMIS-4258 delayBeforeDispatch not working with OpenWire

This commit is contained in:
Justin Bertram 2023-04-25 14:12:25 -05:00
parent 383345a4f6
commit 6ba54964bd
2 changed files with 92 additions and 39 deletions

View File

@ -80,6 +80,7 @@ public class AMQConsumer {
//it's address/queue to management service //it's address/queue to management service
private boolean internalAddress = false; private boolean internalAddress = false;
private volatile Set<MessageReference> rolledbackMessageRefs; private volatile Set<MessageReference> rolledbackMessageRefs;
private ScheduledFuture<?> delayedDispatchPrompter;
public AMQConsumer(AMQSession amqSession, public AMQConsumer(AMQSession amqSession,
org.apache.activemq.command.ActiveMQDestination d, org.apache.activemq.command.ActiveMQDestination d,
@ -179,6 +180,14 @@ public class AMQConsumer {
} }
} }
if (serverConsumer != null && serverConsumer.getQueue() != null && serverConsumer.getQueue().getQueueConfiguration() != null) {
Long delayBeforeDispatch = serverConsumer.getQueue().getQueueConfiguration().getDelayBeforeDispatch();
if (delayBeforeDispatch != null && delayBeforeDispatch > 0) {
Long schedule = delayBeforeDispatch / 2;
delayedDispatchPrompter = scheduledPool.scheduleAtFixedRate(() -> serverConsumer.promptDelivery(), schedule, schedule, TimeUnit.MILLISECONDS);
}
}
serverConsumer.setProtocolData(this); serverConsumer.setProtocolData(this);
} }
@ -404,6 +413,9 @@ public class AMQConsumer {
public void removeConsumer() throws Exception { public void removeConsumer() throws Exception {
serverConsumer.close(false); serverConsumer.close(false);
if (delayedDispatchPrompter != null) {
delayedDispatchPrompter.cancel(false);
}
} }

View File

@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.tests.integration.jms.client; package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -29,43 +28,45 @@ import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** public class JMSConsumerDelayDispatchTest extends MultiprotocolJMSClientTestSupport {
* Exclusive Test
*/
public class ConsumerDelayDispatchTest extends JMSTestBase {
private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.delay.queue"); private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.delay.queue");
private SimpleString normalQueueName = SimpleString.toSimpleString("jms.noraml.queue"); private SimpleString normalQueueName = SimpleString.toSimpleString("jms.normal.queue");
private static final long DELAY_BEFORE_DISPATCH = 10000L; private static final long DELAY_BEFORE_DISPATCH = 10000L;
@Override @Override
@Before protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
public void setUp() throws Exception { super.createAddressAndQueues(server);
super.setUp();
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true).setConsumersBeforeDispatch(2).setDelayBeforeDispatch(DELAY_BEFORE_DISPATCH)); server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true).setConsumersBeforeDispatch(2).setDelayBeforeDispatch(DELAY_BEFORE_DISPATCH));
server.createQueue(new QueueConfiguration(normalQueueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true)); server.createQueue(new QueueConfiguration(normalQueueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
} }
@Test
protected ConnectionFactory getCF() throws Exception { public void testNoDelayOnDefaultAMQP() throws Exception {
return cf; testNoDelayOnDefault(AMQPConnection);
} }
@Test @Test
public void testNoDelayOnDefault() throws Exception { public void testNoDelayOnDefaultOpenWire() throws Exception {
sendMessage(normalQueueName); testNoDelayOnDefault(OpenWireConnection);
}
ConnectionFactory fact = getCF(); @Test
Connection connection = fact.createConnection(); public void testNoDelayOnDefaultCore() throws Exception {
testNoDelayOnDefault(CoreConnection);
}
private void testNoDelayOnDefault(ConnectionSupplier supplier) throws Exception {
sendMessage(normalQueueName, supplier);
Connection connection = supplier.createConnection();
try { try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start(); connection.start();
@ -79,14 +80,26 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
} }
@Test @Test
public void testDelayBeforeDispatch() throws Exception { public void testDelayBeforeDispatchAMQP() throws Exception {
sendMessage(queueName); testDelayBeforeDispatch(AMQPConnection);
}
ConnectionFactory fact = getCF(); @Test
Connection connection = fact.createConnection(); public void testDelayBeforeDispatchOpenWire() throws Exception {
testDelayBeforeDispatch(OpenWireConnection);
}
@Test
public void testDelayBeforeDispatchCore() throws Exception {
testDelayBeforeDispatch(CoreConnection);
}
private void testDelayBeforeDispatch(ConnectionSupplier supplier) throws Exception {
sendMessage(queueName, supplier);
Connection connection = supplier.createConnection();
try { try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start(); connection.start();
@ -103,12 +116,24 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
} }
@Test @Test
public void testConsumersBeforeDispatch() throws Exception { public void testConsumersBeforeDispatchAMQP() throws Exception {
sendMessage(queueName); testConsumersBeforeDispatch(AMQPConnection);
}
@Test
public void testConsumersBeforeDispatchOpenWire() throws Exception {
testConsumersBeforeDispatch(OpenWireConnection);
}
ConnectionFactory fact = getCF(); @Test
Connection connection = fact.createConnection(); public void testConsumersBeforeDispatchCore() throws Exception {
testConsumersBeforeDispatch(CoreConnection);
}
private void testConsumersBeforeDispatch(ConnectionSupplier supplier) throws Exception {
sendMessage(queueName, supplier);
Connection connection = supplier.createConnection();
try { try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@ -127,13 +152,25 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
} }
} }
@Test
public void testContinueAndResetConsumerAMQP() throws Exception {
testContinueAndResetConsumer(AMQPConnection);
}
@Test @Test
public void testContinueAndResetConsumer() throws Exception { public void testContinueAndResetConsumerOpenWire() throws Exception {
sendMessage(queueName); testContinueAndResetConsumer(OpenWireConnection);
}
ConnectionFactory fact = getCF(); @Test
Connection connection = fact.createConnection(); public void testContinueAndResetConsumerCore() throws Exception {
testContinueAndResetConsumer(CoreConnection);
}
private void testContinueAndResetConsumer(ConnectionSupplier supplier) throws Exception {
sendMessage(queueName, supplier);
Connection connection = supplier.createConnection();
try { try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@ -151,15 +188,17 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
consumer2.close(); consumer2.close();
//Ensure that now dispatch is active, if we close a consumer, dispatching continues. //Ensure that now dispatch is active, if we close a consumer, dispatching continues.
sendMessage(queueName); sendMessage(queueName, supplier);
Assert.assertNotNull(receive(consumer1)); Assert.assertNotNull(receive(consumer1));
//Stop all consumers, which should reset dispatch rules. //Stop all consumers, which should reset dispatch rules.
consumer1.close(); consumer1.close();
session.close();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//Ensure that once all consumers are stopped, that dispatch rules reset and wait for min consumers. //Ensure that once all consumers are stopped, that dispatch rules reset and wait for min consumers.
sendMessage(queueName); sendMessage(queueName, supplier);
MessageConsumer consumer3 = session.createConsumer(queue); MessageConsumer consumer3 = session.createConsumer(queue);
@ -173,9 +212,11 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
//Stop all consumers, which should reset dispatch rules. //Stop all consumers, which should reset dispatch rules.
consumer3.close(); consumer3.close();
consumer4.close(); consumer4.close();
session.close();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//Ensure that once all consumers are stopped, that dispatch rules reset and wait for delay. //Ensure that once all consumers are stopped, that dispatch rules reset and wait for delay.
sendMessage(queueName); sendMessage(queueName, supplier);
MessageConsumer consumer5 = session.createConsumer(queue); MessageConsumer consumer5 = session.createConsumer(queue);
@ -191,6 +232,7 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
} }
private Message receive(MessageConsumer consumer1) throws JMSException { private Message receive(MessageConsumer consumer1) throws JMSException {
System.out.println("receiving...");
return consumer1.receive(1000); return consumer1.receive(1000);
} }
@ -202,9 +244,8 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
return receivedMessage; return receivedMessage;
} }
public void sendMessage(SimpleString queue) throws Exception { public void sendMessage(SimpleString queue, ConnectionSupplier supplier) throws Exception {
ConnectionFactory fact = getCF(); Connection connection = supplier.createConnection();
Connection connection = fact.createConnection();
try { try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);