This closes #825

This commit is contained in:
Clebert Suconic 2016-10-07 14:47:22 -04:00
commit 736886fc13
2 changed files with 71 additions and 0 deletions

View File

@ -381,6 +381,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// With pre-ack, we ack *before* sending to the client
ref.getQueue().acknowledge(ref);
acks++;
}
if (message.isLargeMessage() && this.supportLargeMessage) {

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Before;
@ -209,6 +210,75 @@ public class SlowConsumerTest extends ActiveMQTestBase {
assertTrue(notifLatch.await(3, TimeUnit.SECONDS));
}
@Test
public void testSlowConsumerWithPreAckNotification() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, true));
session.createQueue(QUEUE, QUEUE, null, false);
AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(1).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
final int numMessages = 25;
for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
}
SimpleString notifQueue = RandomUtil.randomSimpleString();
session.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false);
ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'");
final CountDownLatch notifLatch = new CountDownLatch(1);
notifConsumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage message) {
assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
IntegrationTestLogger.LOGGER.info("Slow consumer detected!");
assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT));
if (isNetty) {
assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1"));
} else {
assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
}
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME));
assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME));
try {
message.acknowledge();
} catch (ActiveMQException e) {
e.printStackTrace();
}
notifLatch.countDown();
}
});
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
for (int i = 0; i < numMessages; i++) {
ClientMessage msg = consumer.receive(1000);
assertNotNull(msg);
IntegrationTestLogger.LOGGER.info("Received message.");
msg.acknowledge();
session.commit();
Thread.sleep(100);
}
assertFalse(notifLatch.await(3, TimeUnit.SECONDS));
}
@Test
public void testSlowConsumerSpared() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);