From 9f9ffc1ff59de16e356c3dd2f482535a017d4787 Mon Sep 17 00:00:00 2001
From: Clebert Suconic <clebertsuconic@apache.org>
Date: Mon, 9 Jan 2017 12:34:13 -0500
Subject: [PATCH] ARTEMIS-913 Slow consumer detection not working when paging

---
 .../protocol/core/impl/PacketDecoder.java     |   5 +
 .../artemis/core/server/impl/QueueImpl.java   |   5 +-
 .../integration/client/SlowConsumerTest.java  | 171 ++++++------------
 3 files changed, 60 insertions(+), 121 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index d7f243a38b..e39e16baa1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSess
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -445,6 +446,10 @@ public abstract class PacketDecoder implements Serializable {
             packet = new CheckFailoverReplyMessage();
             break;
          }
+         case PacketImpl.DISCONNECT_CONSUMER_KILL: {
+            packet = new DisconnectConsumerWithKillMessage();
+            break;
+         }
          default: {
             throw ActiveMQClientMessageBundle.BUNDLE.invalidType(packetType);
          }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 3caa22c028..a74b0fe333 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2813,12 +2813,13 @@ public class QueueImpl implements Queue {
 
    @Override
    public float getRate() {
+      long locaMessageAdded = getMessagesAdded();
       float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
       if (timeSlice == 0) {
-         messagesAddedSnapshot.getAndSet(messagesAdded);
+         messagesAddedSnapshot.getAndSet(locaMessageAdded);
          return 0.0f;
       }
-      return BigDecimal.valueOf((messagesAdded - messagesAddedSnapshot.getAndSet(messagesAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
+      return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
    }
 
    // Inner classes
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index 3643f77991..547577817e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.client;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -37,15 +36,14 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
-import org.apache.activemq.artemis.core.remoting.server.RemotingService;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -55,16 +53,18 @@ import org.junit.runners.Parameterized;
 public class SlowConsumerTest extends ActiveMQTestBase {
 
    private boolean isNetty = false;
+   private boolean isPaging = false;
 
    // this will ensure that all tests in this class are run twice,
    // once with "true" passed to the class' constructor and once with "false"
-   @Parameterized.Parameters(name = "isNetty={0}")
+   @Parameterized.Parameters(name = "netty={0}, paging={1}")
    public static Collection getParameters() {
-      return Arrays.asList(new Object[][]{{true}, {false}});
+      return Arrays.asList(new Object[][]{{true, false}, {false, false}, {true, true}, {false, true}});
    }
 
-   public SlowConsumerTest(boolean isNetty) {
+   public SlowConsumerTest(boolean isNetty, boolean isPaging) {
       this.isNetty = isNetty;
+      this.isPaging = isPaging;
    }
 
    private ActiveMQServer server;
@@ -78,14 +78,30 @@ public class SlowConsumerTest extends ActiveMQTestBase {
    public void setUp() throws Exception {
       super.setUp();
 
-      server = createServer(false, isNetty);
+      server = createServer(true, isNetty);
 
-      AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setSlowConsumerCheckPeriod(1);
+      addressSettings.setSlowConsumerThreshold(10);
+      addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+
+      if (isPaging) {
+         addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+         addressSettings.setMaxSizeBytes(10 * 1024);
+         addressSettings.setPageSizeBytes(1024);
+      } else {
+         addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+         addressSettings.setMaxSizeBytes(-1);
+         addressSettings.setPageSizeBytes(1024);
+
+      }
 
       server.start();
 
       server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
 
+      server.createQueue(QUEUE, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging();
+
       locator = createFactory(isNetty);
    }
 
@@ -95,10 +111,10 @@ public class SlowConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = addClientSession(sf.createSession(false, true, true, false));
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
 
+      assertPaging();
+
       final int numMessages = 25;
 
       for (int i = 0; i < numMessages; i++) {
@@ -118,36 +134,12 @@ public class SlowConsumerTest extends ActiveMQTestBase {
       }
    }
 
-   @Test
-   public void testDisableSlowConsumerReconnectWithKilled() throws Exception {
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = addClientSession(sf.createSession(false, true, true, false));
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
-
-      final int numMessages = 25;
-
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(createTextMessage(session, "m" + i));
-      }
-
-      ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
-      session.start();
-
-      Thread.sleep(3000);
-
-      RemotingService service = server.getRemotingService();
-      Set<RemotingConnection> connections = service.getConnections();
-      assertTrue(connections.isEmpty());
-
-      if (sf instanceof ClientSessionFactoryImpl) {
-         int reconnectAttemps = ((ClientSessionFactoryImpl) sf).getReconnectAttempts();
-         assertEquals(0, reconnectAttemps);
+   private void assertPaging() throws Exception {
+      Queue queue = server.locateQueue(QUEUE);
+      if (isPaging) {
+         Assert.assertTrue(queue.getPageSubscription().isPaging());
       } else {
-         fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl");
+         Assert.assertFalse(queue.getPageSubscription().isPaging());
       }
    }
 
@@ -158,13 +150,20 @@ public class SlowConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = addClientSession(sf.createSession(false, true, true, false));
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setSlowConsumerCheckPeriod(2);
+      addressSettings.setSlowConsumerThreshold(10);
+      addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
+      if (!isPaging) {
+         addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+         addressSettings.setMaxSizeBytes(-1);
+      }
 
       server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
       server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
 
+      assertPaging();
+
       ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
 
       final int numMessages = 25;
@@ -207,76 +206,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
       ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
       session.start();
 
-      assertTrue(notifLatch.await(3, TimeUnit.SECONDS));
-   }
-
-   @Test
-   public void testSlowConsumerWithPreAckNotification() throws Exception {
-
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = addClientSession(sf.createSession(false, true, true, true));
-
-      session.createQueue(QUEUE, QUEUE, null, false);
-
-      AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(1).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
-
-      server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
-      server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
-
-      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
-
-      final int numMessages = 25;
-
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(createTextMessage(session, "m" + i));
-      }
-
-      SimpleString notifQueue = RandomUtil.randomSimpleString();
-
-      session.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false);
-
-      ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'");
-
-      final CountDownLatch notifLatch = new CountDownLatch(1);
-
-      notifConsumer.setMessageHandler(new MessageHandler() {
-         @Override
-         public void onMessage(ClientMessage message) {
-            assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
-            IntegrationTestLogger.LOGGER.info("Slow consumer detected!");
-            assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
-            assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT));
-            if (isNetty) {
-               assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1"));
-            } else {
-               assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
-            }
-            assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
-            assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME));
-            assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME));
-            try {
-               message.acknowledge();
-            } catch (ActiveMQException e) {
-               e.printStackTrace();
-            }
-            notifLatch.countDown();
-         }
-      });
-
-      ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
-      session.start();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage msg = consumer.receive(1000);
-         assertNotNull(msg);
-         IntegrationTestLogger.LOGGER.info("Received message.");
-         msg.acknowledge();
-         session.commit();
-         Thread.sleep(100);
-      }
-
-      assertFalse(notifLatch.await(3, TimeUnit.SECONDS));
+      assertTrue(notifLatch.await(15, TimeUnit.SECONDS));
    }
 
    @Test
@@ -285,8 +215,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
 
       ClientSession session = addClientSession(sf.createSession(true, true));
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
 
       final int numMessages = 5;
@@ -295,6 +223,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
          producer.send(createTextMessage(session, "m" + i));
       }
 
+      assertPaging();
+
       ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
       session.start();
 
@@ -315,8 +245,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
 
       final ClientSession producerSession = addClientSession(sf.createSession(true, true));
 
-      session.createQueue(QUEUE, QUEUE, null, false);
-
       final ClientProducer producer = addClientProducer(producerSession.createProducer(QUEUE));
 
       final AtomicLong messagesProduced = new AtomicLong(0);
@@ -356,6 +284,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
 
       t.start();
 
+      assertPaging();
+
       ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
       session.start();
 
@@ -382,7 +312,10 @@ public class SlowConsumerTest extends ActiveMQTestBase {
       SimpleString queueName2 = new SimpleString("Q2");
       SimpleString queueName = new SimpleString("Q");
 
-      AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setSlowConsumerCheckPeriod(2);
+      addressSettings.setSlowConsumerThreshold(10);
+      addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
 
       server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);