ARTEMIS-1999 Broker uses 100% core's CPU time if msg grouping is used
The deliver loop won't give up trying to deliver messages when back-pressure kicks in (credits and/or TCP) if msg grouping is used and there are many consumers registered: this change will allow the loop to exit by instructing the logic that the group consumer is the only consumer to check.
This commit is contained in:
parent
99469b1cff
commit
8dd0e9472f
|
@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
@ -1289,6 +1290,65 @@ public class QueueImplTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGroupMessageWithManyConsumers() throws Exception {
|
||||||
|
final CountDownLatch firstMessageHandled = new CountDownLatch(1);
|
||||||
|
final CountDownLatch finished = new CountDownLatch(2);
|
||||||
|
final Consumer groupConsumer = new FakeConsumer() {
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized HandleStatus handle(MessageReference reference) {
|
||||||
|
if (count == 0) {
|
||||||
|
//the first message is handled and will be used to determine this consumer
|
||||||
|
//to be the group consumer
|
||||||
|
count++;
|
||||||
|
firstMessageHandled.countDown();
|
||||||
|
return HandleStatus.HANDLED;
|
||||||
|
} else if (count <= 2) {
|
||||||
|
//the next two attempts to send the second message will be done
|
||||||
|
//attempting a direct delivery and an async one after that
|
||||||
|
count++;
|
||||||
|
finished.countDown();
|
||||||
|
return HandleStatus.BUSY;
|
||||||
|
} else {
|
||||||
|
//this shouldn't happen, because the last attempt to deliver
|
||||||
|
//the second message should have stop the delivery loop:
|
||||||
|
//it will succeed just to let the message being handled and
|
||||||
|
//reduce the message count to 0
|
||||||
|
return HandleStatus.HANDLED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final Consumer noConsumer = new FakeConsumer() {
|
||||||
|
@Override
|
||||||
|
public synchronized HandleStatus handle(MessageReference reference) {
|
||||||
|
Assert.fail("this consumer isn't allowed to consume any message");
|
||||||
|
throw new AssertionError();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1,
|
||||||
|
null, null, false, true, false,
|
||||||
|
scheduledExecutor, null, null, null,
|
||||||
|
ArtemisExecutor.delegate(executor), null, null);
|
||||||
|
queue.addConsumer(groupConsumer);
|
||||||
|
queue.addConsumer(noConsumer);
|
||||||
|
final MessageReference firstMessageReference = generateReference(queue, 1);
|
||||||
|
final SimpleString groupName = SimpleString.toSimpleString("group");
|
||||||
|
firstMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
|
||||||
|
final MessageReference secondMessageReference = generateReference(queue, 2);
|
||||||
|
secondMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
|
||||||
|
queue.addTail(firstMessageReference, true);
|
||||||
|
Assert.assertTrue("first message isn't handled", firstMessageHandled.await(3000, TimeUnit.MILLISECONDS));
|
||||||
|
Assert.assertEquals("group consumer isn't correctly set", groupConsumer, queue.getGroups().get(groupName));
|
||||||
|
queue.addTail(secondMessageReference, true);
|
||||||
|
final boolean atLeastTwoDeliverAttempts = finished.await(3000, TimeUnit.MILLISECONDS);
|
||||||
|
Assert.assertTrue(atLeastTwoDeliverAttempts);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
Assert.assertEquals("The second message should be in the queue", 1, queue.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
private QueueImpl getNonDurableQueue() {
|
private QueueImpl getNonDurableQueue() {
|
||||||
return getQueue(QueueImplTest.queue1, false, false, null);
|
return getQueue(QueueImplTest.queue1, false, false, null);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue