ARTEMIS-913 Slow consumer detection not working when paging

This commit is contained in:
Clebert Suconic 2017-01-09 12:34:13 -05:00
parent ced0e9c861
commit 9f9ffc1ff5
3 changed files with 60 additions and 121 deletions

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSess
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
@ -445,6 +446,10 @@ public abstract class PacketDecoder implements Serializable {
packet = new CheckFailoverReplyMessage(); packet = new CheckFailoverReplyMessage();
break; break;
} }
case PacketImpl.DISCONNECT_CONSUMER_KILL: {
packet = new DisconnectConsumerWithKillMessage();
break;
}
default: { default: {
throw ActiveMQClientMessageBundle.BUNDLE.invalidType(packetType); throw ActiveMQClientMessageBundle.BUNDLE.invalidType(packetType);
} }

View File

@ -2813,12 +2813,13 @@ public class QueueImpl implements Queue {
@Override @Override
public float getRate() { public float getRate() {
long locaMessageAdded = getMessagesAdded();
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
if (timeSlice == 0) { if (timeSlice == 0) {
messagesAddedSnapshot.getAndSet(messagesAdded); messagesAddedSnapshot.getAndSet(locaMessageAdded);
return 0.0f; return 0.0f;
} }
return BigDecimal.valueOf((messagesAdded - messagesAddedSnapshot.getAndSet(messagesAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
} }
// Inner classes // Inner classes

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.client;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -37,15 +36,14 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -55,16 +53,18 @@ import org.junit.runners.Parameterized;
public class SlowConsumerTest extends ActiveMQTestBase { public class SlowConsumerTest extends ActiveMQTestBase {
private boolean isNetty = false; private boolean isNetty = false;
private boolean isPaging = false;
// this will ensure that all tests in this class are run twice, // this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false" // once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "isNetty={0}") @Parameterized.Parameters(name = "netty={0}, paging={1}")
public static Collection getParameters() { public static Collection getParameters() {
return Arrays.asList(new Object[][]{{true}, {false}}); return Arrays.asList(new Object[][]{{true, false}, {false, false}, {true, true}, {false, true}});
} }
public SlowConsumerTest(boolean isNetty) { public SlowConsumerTest(boolean isNetty, boolean isPaging) {
this.isNetty = isNetty; this.isNetty = isNetty;
this.isPaging = isPaging;
} }
private ActiveMQServer server; private ActiveMQServer server;
@ -78,14 +78,30 @@ public class SlowConsumerTest extends ActiveMQTestBase {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
server = createServer(false, isNetty); server = createServer(true, isNetty);
AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL); AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(1);
addressSettings.setSlowConsumerThreshold(10);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
if (isPaging) {
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setMaxSizeBytes(10 * 1024);
addressSettings.setPageSizeBytes(1024);
} else {
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(-1);
addressSettings.setPageSizeBytes(1024);
}
server.start(); server.start();
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
server.createQueue(QUEUE, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging();
locator = createFactory(isNetty); locator = createFactory(isNetty);
} }
@ -95,10 +111,10 @@ public class SlowConsumerTest extends ActiveMQTestBase {
ClientSession session = addClientSession(sf.createSession(false, true, true, false)); ClientSession session = addClientSession(sf.createSession(false, true, true, false));
session.createQueue(QUEUE, QUEUE, null, false);
ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
assertPaging();
final int numMessages = 25; final int numMessages = 25;
for (int i = 0; i < numMessages; i++) { for (int i = 0; i < numMessages; i++) {
@ -118,36 +134,12 @@ public class SlowConsumerTest extends ActiveMQTestBase {
} }
} }
@Test private void assertPaging() throws Exception {
public void testDisableSlowConsumerReconnectWithKilled() throws Exception { Queue queue = server.locateQueue(QUEUE);
ClientSessionFactory sf = createSessionFactory(locator); if (isPaging) {
Assert.assertTrue(queue.getPageSubscription().isPaging());
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
session.createQueue(QUEUE, QUEUE, null, false);
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
final int numMessages = 25;
for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
}
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
Thread.sleep(3000);
RemotingService service = server.getRemotingService();
Set<RemotingConnection> connections = service.getConnections();
assertTrue(connections.isEmpty());
if (sf instanceof ClientSessionFactoryImpl) {
int reconnectAttemps = ((ClientSessionFactoryImpl) sf).getReconnectAttempts();
assertEquals(0, reconnectAttemps);
} else { } else {
fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl"); Assert.assertFalse(queue.getPageSubscription().isPaging());
} }
} }
@ -158,13 +150,20 @@ public class SlowConsumerTest extends ActiveMQTestBase {
ClientSession session = addClientSession(sf.createSession(false, true, true, false)); ClientSession session = addClientSession(sf.createSession(false, true, true, false));
session.createQueue(QUEUE, QUEUE, null, false); AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); addressSettings.setSlowConsumerThreshold(10);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
if (!isPaging) {
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(-1);
}
server.getAddressSettingsRepository().removeMatch(QUEUE.toString()); server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
assertPaging();
ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
final int numMessages = 25; final int numMessages = 25;
@ -207,76 +206,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start(); session.start();
assertTrue(notifLatch.await(3, TimeUnit.SECONDS)); assertTrue(notifLatch.await(15, TimeUnit.SECONDS));
}
@Test
public void testSlowConsumerWithPreAckNotification() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, true));
session.createQueue(QUEUE, QUEUE, null, false);
AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(1).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
final int numMessages = 25;
for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
}
SimpleString notifQueue = RandomUtil.randomSimpleString();
session.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false);
ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'");
final CountDownLatch notifLatch = new CountDownLatch(1);
notifConsumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage message) {
assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
IntegrationTestLogger.LOGGER.info("Slow consumer detected!");
assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT));
if (isNetty) {
assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1"));
} else {
assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
}
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME));
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME));
try {
message.acknowledge();
} catch (ActiveMQException e) {
e.printStackTrace();
}
notifLatch.countDown();
}
});
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
for (int i = 0; i < numMessages; i++) {
ClientMessage msg = consumer.receive(1000);
assertNotNull(msg);
IntegrationTestLogger.LOGGER.info("Received message.");
msg.acknowledge();
session.commit();
Thread.sleep(100);
}
assertFalse(notifLatch.await(3, TimeUnit.SECONDS));
} }
@Test @Test
@ -285,8 +215,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
ClientSession session = addClientSession(sf.createSession(true, true)); ClientSession session = addClientSession(sf.createSession(true, true));
session.createQueue(QUEUE, QUEUE, null, false);
ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
final int numMessages = 5; final int numMessages = 5;
@ -295,6 +223,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
producer.send(createTextMessage(session, "m" + i)); producer.send(createTextMessage(session, "m" + i));
} }
assertPaging();
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start(); session.start();
@ -315,8 +245,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
final ClientSession producerSession = addClientSession(sf.createSession(true, true)); final ClientSession producerSession = addClientSession(sf.createSession(true, true));
session.createQueue(QUEUE, QUEUE, null, false);
final ClientProducer producer = addClientProducer(producerSession.createProducer(QUEUE)); final ClientProducer producer = addClientProducer(producerSession.createProducer(QUEUE));
final AtomicLong messagesProduced = new AtomicLong(0); final AtomicLong messagesProduced = new AtomicLong(0);
@ -356,6 +284,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
t.start(); t.start();
assertPaging();
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start(); session.start();
@ -382,7 +312,10 @@ public class SlowConsumerTest extends ActiveMQTestBase {
SimpleString queueName2 = new SimpleString("Q2"); SimpleString queueName2 = new SimpleString("Q2");
SimpleString queueName = new SimpleString("Q"); SimpleString queueName = new SimpleString("Q");
AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL); AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThreshold(10);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings); server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);