This closes #954
This commit is contained in:
commit
57de21875a
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSess
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
|
||||
|
@ -445,6 +446,10 @@ public abstract class PacketDecoder implements Serializable {
|
|||
packet = new CheckFailoverReplyMessage();
|
||||
break;
|
||||
}
|
||||
case PacketImpl.DISCONNECT_CONSUMER_KILL: {
|
||||
packet = new DisconnectConsumerWithKillMessage();
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw ActiveMQClientMessageBundle.BUNDLE.invalidType(packetType);
|
||||
}
|
||||
|
|
|
@ -2813,12 +2813,13 @@ public class QueueImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public float getRate() {
|
||||
long locaMessageAdded = getMessagesAdded();
|
||||
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
|
||||
if (timeSlice == 0) {
|
||||
messagesAddedSnapshot.getAndSet(messagesAdded);
|
||||
messagesAddedSnapshot.getAndSet(locaMessageAdded);
|
||||
return 0.0f;
|
||||
}
|
||||
return BigDecimal.valueOf((messagesAdded - messagesAddedSnapshot.getAndSet(messagesAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
|
||||
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
|
||||
}
|
||||
|
||||
// Inner classes
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.client;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -37,15 +36,14 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
|
||||
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -55,16 +53,18 @@ import org.junit.runners.Parameterized;
|
|||
public class SlowConsumerTest extends ActiveMQTestBase {
|
||||
|
||||
private boolean isNetty = false;
|
||||
private boolean isPaging = false;
|
||||
|
||||
// this will ensure that all tests in this class are run twice,
|
||||
// once with "true" passed to the class' constructor and once with "false"
|
||||
@Parameterized.Parameters(name = "isNetty={0}")
|
||||
@Parameterized.Parameters(name = "netty={0}, paging={1}")
|
||||
public static Collection getParameters() {
|
||||
return Arrays.asList(new Object[][]{{true}, {false}});
|
||||
return Arrays.asList(new Object[][]{{true, false}, {false, false}, {true, true}, {false, true}});
|
||||
}
|
||||
|
||||
public SlowConsumerTest(boolean isNetty) {
|
||||
public SlowConsumerTest(boolean isNetty, boolean isPaging) {
|
||||
this.isNetty = isNetty;
|
||||
this.isPaging = isPaging;
|
||||
}
|
||||
|
||||
private ActiveMQServer server;
|
||||
|
@ -78,14 +78,30 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
server = createServer(false, isNetty);
|
||||
server = createServer(true, isNetty);
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
|
||||
AddressSettings addressSettings = new AddressSettings();
|
||||
addressSettings.setSlowConsumerCheckPeriod(1);
|
||||
addressSettings.setSlowConsumerThreshold(10);
|
||||
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
|
||||
|
||||
if (isPaging) {
|
||||
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
|
||||
addressSettings.setMaxSizeBytes(10 * 1024);
|
||||
addressSettings.setPageSizeBytes(1024);
|
||||
} else {
|
||||
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||
addressSettings.setMaxSizeBytes(-1);
|
||||
addressSettings.setPageSizeBytes(1024);
|
||||
|
||||
}
|
||||
|
||||
server.start();
|
||||
|
||||
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
|
||||
|
||||
server.createQueue(QUEUE, QUEUE, null, true, false).getPageSubscription().getPagingStore().startPaging();
|
||||
|
||||
locator = createFactory(isNetty);
|
||||
}
|
||||
|
||||
|
@ -95,10 +111,10 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
|
||||
|
||||
session.createQueue(QUEUE, QUEUE, null, false);
|
||||
|
||||
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
|
||||
|
||||
assertPaging();
|
||||
|
||||
final int numMessages = 25;
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
|
@ -118,36 +134,12 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisableSlowConsumerReconnectWithKilled() throws Exception {
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
|
||||
|
||||
session.createQueue(QUEUE, QUEUE, null, false);
|
||||
|
||||
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
|
||||
|
||||
final int numMessages = 25;
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
producer.send(createTextMessage(session, "m" + i));
|
||||
}
|
||||
|
||||
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
|
||||
session.start();
|
||||
|
||||
Thread.sleep(3000);
|
||||
|
||||
RemotingService service = server.getRemotingService();
|
||||
Set<RemotingConnection> connections = service.getConnections();
|
||||
assertTrue(connections.isEmpty());
|
||||
|
||||
if (sf instanceof ClientSessionFactoryImpl) {
|
||||
int reconnectAttemps = ((ClientSessionFactoryImpl) sf).getReconnectAttempts();
|
||||
assertEquals(0, reconnectAttemps);
|
||||
private void assertPaging() throws Exception {
|
||||
Queue queue = server.locateQueue(QUEUE);
|
||||
if (isPaging) {
|
||||
Assert.assertTrue(queue.getPageSubscription().isPaging());
|
||||
} else {
|
||||
fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl");
|
||||
Assert.assertFalse(queue.getPageSubscription().isPaging());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,13 +150,20 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
|
||||
|
||||
session.createQueue(QUEUE, QUEUE, null, false);
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
|
||||
AddressSettings addressSettings = new AddressSettings();
|
||||
addressSettings.setSlowConsumerCheckPeriod(2);
|
||||
addressSettings.setSlowConsumerThreshold(10);
|
||||
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
|
||||
if (!isPaging) {
|
||||
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||
addressSettings.setMaxSizeBytes(-1);
|
||||
}
|
||||
|
||||
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
|
||||
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
|
||||
|
||||
assertPaging();
|
||||
|
||||
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
|
||||
|
||||
final int numMessages = 25;
|
||||
|
@ -207,76 +206,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
|
||||
session.start();
|
||||
|
||||
assertTrue(notifLatch.await(3, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowConsumerWithPreAckNotification() throws Exception {
|
||||
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = addClientSession(sf.createSession(false, true, true, true));
|
||||
|
||||
session.createQueue(QUEUE, QUEUE, null, false);
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(1).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
|
||||
|
||||
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
|
||||
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
|
||||
|
||||
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
|
||||
|
||||
final int numMessages = 25;
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
producer.send(createTextMessage(session, "m" + i));
|
||||
}
|
||||
|
||||
SimpleString notifQueue = RandomUtil.randomSimpleString();
|
||||
|
||||
session.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false);
|
||||
|
||||
ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'");
|
||||
|
||||
final CountDownLatch notifLatch = new CountDownLatch(1);
|
||||
|
||||
notifConsumer.setMessageHandler(new MessageHandler() {
|
||||
@Override
|
||||
public void onMessage(ClientMessage message) {
|
||||
assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
|
||||
IntegrationTestLogger.LOGGER.info("Slow consumer detected!");
|
||||
assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
|
||||
assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT));
|
||||
if (isNetty) {
|
||||
assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1"));
|
||||
} else {
|
||||
assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
|
||||
}
|
||||
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
|
||||
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME));
|
||||
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME));
|
||||
try {
|
||||
message.acknowledge();
|
||||
} catch (ActiveMQException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
notifLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
|
||||
session.start();
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage msg = consumer.receive(1000);
|
||||
assertNotNull(msg);
|
||||
IntegrationTestLogger.LOGGER.info("Received message.");
|
||||
msg.acknowledge();
|
||||
session.commit();
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
assertFalse(notifLatch.await(3, TimeUnit.SECONDS));
|
||||
assertTrue(notifLatch.await(15, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -285,8 +215,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = addClientSession(sf.createSession(true, true));
|
||||
|
||||
session.createQueue(QUEUE, QUEUE, null, false);
|
||||
|
||||
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
|
||||
|
||||
final int numMessages = 5;
|
||||
|
@ -295,6 +223,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
producer.send(createTextMessage(session, "m" + i));
|
||||
}
|
||||
|
||||
assertPaging();
|
||||
|
||||
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
|
||||
session.start();
|
||||
|
||||
|
@ -315,8 +245,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
|
||||
final ClientSession producerSession = addClientSession(sf.createSession(true, true));
|
||||
|
||||
session.createQueue(QUEUE, QUEUE, null, false);
|
||||
|
||||
final ClientProducer producer = addClientProducer(producerSession.createProducer(QUEUE));
|
||||
|
||||
final AtomicLong messagesProduced = new AtomicLong(0);
|
||||
|
@ -356,6 +284,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
|
||||
t.start();
|
||||
|
||||
assertPaging();
|
||||
|
||||
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
|
||||
session.start();
|
||||
|
||||
|
@ -382,7 +312,10 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
|||
SimpleString queueName2 = new SimpleString("Q2");
|
||||
SimpleString queueName = new SimpleString("Q");
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(10).setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
|
||||
AddressSettings addressSettings = new AddressSettings();
|
||||
addressSettings.setSlowConsumerCheckPeriod(2);
|
||||
addressSettings.setSlowConsumerThreshold(10);
|
||||
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
|
||||
|
||||
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
|
||||
|
||||
|
|
Loading…
Reference in New Issue