From 59bff3b36b14fd00c0c06f3a53320a5f9c99f403 Mon Sep 17 00:00:00 2001 From: jbertram Date: Fri, 7 Oct 2016 11:43:19 -0500 Subject: [PATCH] ARTEMIS-767 consumer with pre-ack flagged as slow --- .../core/server/impl/ServerConsumerImpl.java | 1 + .../integration/client/SlowConsumerTest.java | 70 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 24eacf53d7..1318ff31df 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -381,6 +381,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // With pre-ack, we ack *before* sending to the client ref.getQueue().acknowledge(ref); + acks++; } if (message.isLargeMessage() && this.supportLargeMessage) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java index a2d60d98c0..3643f77991 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Before; @@ -209,6 +210,75 @@ public class SlowConsumerTest extends ActiveMQTestBase { assertTrue(notifLatch.await(3, TimeUnit.SECONDS)); } + @Test + public void testSlowConsumerWithPreAckNotification() throws Exception { + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, true, true, true)); + + session.createQueue(QUEUE, QUEUE, null, false); + + AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(1).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); + + server.getAddressSettingsRepository().removeMatch(QUEUE.toString()); + server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + + final int numMessages = 25; + + for (int i = 0; i < numMessages; i++) { + producer.send(createTextMessage(session, "m" + i)); + } + + SimpleString notifQueue = RandomUtil.randomSimpleString(); + + session.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false); + + ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'"); + + final CountDownLatch notifLatch = new CountDownLatch(1); + + notifConsumer.setMessageHandler(new MessageHandler() { + @Override + public void onMessage(ClientMessage message) { + assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + IntegrationTestLogger.LOGGER.info("Slow consumer detected!"); + assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); + assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT)); + if (isNetty) { + assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1")); + } else { + assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS)); + } + assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME)); + assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME)); + assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME)); + try { + message.acknowledge(); + } catch (ActiveMQException e) { + e.printStackTrace(); + } + notifLatch.countDown(); + } + }); + + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); + session.start(); + + for (int i = 0; i < numMessages; i++) { + ClientMessage msg = consumer.receive(1000); + assertNotNull(msg); + IntegrationTestLogger.LOGGER.info("Received message."); + msg.acknowledge(); + session.commit(); + Thread.sleep(100); + } + + assertFalse(notifLatch.await(3, TimeUnit.SECONDS)); + } + @Test public void testSlowConsumerSpared() throws Exception { ClientSessionFactory sf = createSessionFactory(locator);