From 90cf2398296d9de83145043c6dc0d8f6a6d4c068 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Mon, 16 Jan 2017 22:00:05 +0800 Subject: [PATCH 1/2] ARTEMIS-921 Consumers killed as slow even if overall consuming rate is above threshold --- .../artemis/core/server/impl/QueueImpl.java | 3 + .../client/MultipleSlowConsumerTest.java | 261 ++++++++++++++++++ 2 files changed, 264 insertions(+) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a74b0fe333..87a5bd799c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3159,6 +3159,9 @@ public class QueueImpl implements Queue { connection.killMessage(server.getNodeID()); remotingService.removeConnection(connection.getID()); connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())); + //break once a consumer gets killed. This can prevent all + //consumers to this queue get killed all at once. + break; } else if (policy.equals(SlowConsumerPolicy.NOTIFY)) { TypedProperties props = new TypedProperties(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java new file mode 100644 index 0000000000..37ae528383 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.client; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.TimeUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Set; + +public class MultipleSlowConsumerTest extends ActiveMQTestBase { + + private int checkPeriod = 3; + private int threshold = 1; + + private ActiveMQServer server; + + private final SimpleString QUEUE = new SimpleString("SlowConsumerTestQueue"); + + private ServerLocator locator; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + server = createServer(true, true); + + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setSlowConsumerCheckPeriod(checkPeriod); + addressSettings.setSlowConsumerThreshold(threshold); + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + + server.start(); + + server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + + server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false); + + locator = createFactory(true); + } + + /** + * This test creates 3 consumers on one queue. A producer sends + * messages at a rate of 2 mesages per second. Each consumer + * consumes messages at rate of 1 message per second. The slow + * consumer threshold is 1 message per second. + * Based on the above settings, at least one of the consumers + * will be removed during the test, but at least one of the + * consumers will remain and all messages will be received. + */ + @Test + public void testMultipleConsumersOneQueue() throws Exception { + locator.setAckBatchSize(0); + + ClientSessionFactory sf1 = createSessionFactory(locator); + ClientSessionFactory sf2 = createSessionFactory(locator); + ClientSessionFactory sf3 = createSessionFactory(locator); + ClientSessionFactory sf4 = createSessionFactory(locator); + + final int messages = 10; + + FixedRateProducer producer = new FixedRateProducer(sf1, QUEUE, messages); + + final Set consumers = new ConcurrentHashSet<>(); + final Set receivedMessages = new ConcurrentHashSet<>(); + + consumers.add(new FixedRateConsumer(sf2, QUEUE, consumers, receivedMessages, 1)); + consumers.add(new FixedRateConsumer(sf3, QUEUE, consumers, receivedMessages, 2)); + consumers.add(new FixedRateConsumer(sf4, QUEUE, consumers, receivedMessages, 3)); + + try { + producer.start(threshold * 1000 / 2); + + for (FixedRateConsumer consumer : consumers) { + consumer.start(threshold * 1000); + } + + //check at least one consumer is killed + //but at least one survived + //and all messages are received. + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() < 3)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0)); + } finally { + producer.stopRunning(); + for (FixedRateConsumer consumer : consumers) { + consumer.stopRunning(); + } + System.out.println("***report messages received: " + receivedMessages.size()); + System.out.println("***consumers left: " + consumers.size()); + } + } + + private class FixedRateProducer extends FixedRateClient { + + int messages; + ClientProducer producer; + + FixedRateProducer(ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException { + super(sf, queue); + this.messages = messages; + } + + @Override + protected void prepareWork() throws ActiveMQException { + super.prepareWork(); + this.producer = session.createProducer(queue); + } + + @Override + protected void doWork(int count) throws Exception { + + if (count < messages) { + ClientMessage m = createTextMessage(session, "msg" + count); + producer.send(m); + System.out.println("producer sent a message " + count); + } else { + stopRunning(); + } + } + } + + private class FixedRateConsumer extends FixedRateClient { + + Set consumers; + ClientConsumer consumer; + Set receivedMessages; + int id; + + FixedRateConsumer(ClientSessionFactory sf, SimpleString queue, + Set consumers, Set receivedMessages, + int id) throws ActiveMQException { + super(sf, queue); + this.consumers = consumers; + this.receivedMessages = receivedMessages; + this.id = id; + } + + @Override + protected void prepareWork() throws ActiveMQException { + super.prepareWork(); + this.consumer = session.createConsumer(queue); + this.session.start(); + } + + @Override + protected void doWork(int count) throws Exception { + ClientMessage m = this.consumer.receive(rate); + System.out.println("consumer " + id + " got m: " + m); + if (m != null) { + receivedMessages.add(m); + m.acknowledge(); + System.out.println("acked " + m.getClass().getName() + "now total received: " + receivedMessages.size()); + } + } + + @Override + protected void handleError(int count, Exception e) { + System.err.println("Got error receiving message " + count + " remove self " + this.id); + consumers.remove(this); + e.printStackTrace(); + } + + } + + private abstract class FixedRateClient extends Thread { + + protected ClientSessionFactory sf; + protected SimpleString queue; + protected ClientSession session; + protected int rate; + protected volatile boolean working; + + FixedRateClient(ClientSessionFactory sf, SimpleString queue) throws ActiveMQException { + this.sf = sf; + this.queue = queue; + } + + public void start(int rate) { + this.rate = rate; + working = true; + start(); + } + + protected void prepareWork() throws ActiveMQException { + this.session = addClientSession(sf.createSession(true, true)); + } + + @Override + public void run() { + try { + prepareWork(); + } catch (ActiveMQException e) { + System.out.println("got error in prepareWork(), aborting..."); + e.printStackTrace(); + return; + } + int count = 0; + while (working) { + try { + doWork(count); + Thread.sleep(rate); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + System.err.println(this + " got exception "); + e.printStackTrace(); + handleError(count, e); + working = false; + } finally { + count++; + } + } + } + + protected abstract void doWork(int count) throws Exception; + + protected void handleError(int count, Exception e) { + } + + public void stopRunning() { + working = false; + interrupt(); + try { + join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} From 599aaa5345bb75a82da00c633575ac62fecf1bd6 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 16 Jan 2017 10:27:27 -0500 Subject: [PATCH 2/2] ARTEMIS-921 Fixing Slow Consumer when multiple consumers on same queue --- .../artemis/core/server/impl/QueueImpl.java | 18 +- .../client/MultipleSlowConsumerTest.java | 261 ------------------ .../integration/client/SlowConsumerTest.java | 217 ++++++++++++++- 3 files changed, 230 insertions(+), 266 deletions(-) delete mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 87a5bd799c..ecc67bfbcb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3132,7 +3132,20 @@ public class QueueImpl implements Queue { if (logger.isDebugEnabled()) { logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); } - for (Consumer consumer : getConsumers()) { + + Set consumersSet = getConsumers(); + + if (consumersSet.size() == 0) { + logger.debug("There are no consumers, no need to check slow consumer's rate"); + return; + } else if (queueRate < (threshold * consumersSet.size())) { + if (logger.isDebugEnabled()) { + logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); + } + return; + } + + for (Consumer consumer : consumersSet) { if (consumer instanceof ServerConsumerImpl) { ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; float consumerRate = serverConsumer.getRate(); @@ -3159,9 +3172,6 @@ public class QueueImpl implements Queue { connection.killMessage(server.getNodeID()); remotingService.removeConnection(connection.getID()); connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())); - //break once a consumer gets killed. This can prevent all - //consumers to this queue get killed all at once. - break; } else if (policy.equals(SlowConsumerPolicy.NOTIFY)) { TypedProperties props = new TypedProperties(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java deleted file mode 100644 index 37ae528383..0000000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.client; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.client.ClientConsumer; -import org.apache.activemq.artemis.api.core.client.ClientMessage; -import org.apache.activemq.artemis.api.core.client.ClientProducer; -import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.RoutingType; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; -import org.apache.activemq.artemis.utils.TimeUtils; -import org.junit.Before; -import org.junit.Test; - -import java.util.Set; - -public class MultipleSlowConsumerTest extends ActiveMQTestBase { - - private int checkPeriod = 3; - private int threshold = 1; - - private ActiveMQServer server; - - private final SimpleString QUEUE = new SimpleString("SlowConsumerTestQueue"); - - private ServerLocator locator; - - @Before - @Override - public void setUp() throws Exception { - super.setUp(); - - server = createServer(true, true); - - AddressSettings addressSettings = new AddressSettings(); - addressSettings.setSlowConsumerCheckPeriod(checkPeriod); - addressSettings.setSlowConsumerThreshold(threshold); - addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); - - server.start(); - - server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); - - server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false); - - locator = createFactory(true); - } - - /** - * This test creates 3 consumers on one queue. A producer sends - * messages at a rate of 2 mesages per second. Each consumer - * consumes messages at rate of 1 message per second. The slow - * consumer threshold is 1 message per second. - * Based on the above settings, at least one of the consumers - * will be removed during the test, but at least one of the - * consumers will remain and all messages will be received. - */ - @Test - public void testMultipleConsumersOneQueue() throws Exception { - locator.setAckBatchSize(0); - - ClientSessionFactory sf1 = createSessionFactory(locator); - ClientSessionFactory sf2 = createSessionFactory(locator); - ClientSessionFactory sf3 = createSessionFactory(locator); - ClientSessionFactory sf4 = createSessionFactory(locator); - - final int messages = 10; - - FixedRateProducer producer = new FixedRateProducer(sf1, QUEUE, messages); - - final Set consumers = new ConcurrentHashSet<>(); - final Set receivedMessages = new ConcurrentHashSet<>(); - - consumers.add(new FixedRateConsumer(sf2, QUEUE, consumers, receivedMessages, 1)); - consumers.add(new FixedRateConsumer(sf3, QUEUE, consumers, receivedMessages, 2)); - consumers.add(new FixedRateConsumer(sf4, QUEUE, consumers, receivedMessages, 3)); - - try { - producer.start(threshold * 1000 / 2); - - for (FixedRateConsumer consumer : consumers) { - consumer.start(threshold * 1000); - } - - //check at least one consumer is killed - //but at least one survived - //and all messages are received. - assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() < 3)); - assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0)); - assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages)); - assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0)); - } finally { - producer.stopRunning(); - for (FixedRateConsumer consumer : consumers) { - consumer.stopRunning(); - } - System.out.println("***report messages received: " + receivedMessages.size()); - System.out.println("***consumers left: " + consumers.size()); - } - } - - private class FixedRateProducer extends FixedRateClient { - - int messages; - ClientProducer producer; - - FixedRateProducer(ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException { - super(sf, queue); - this.messages = messages; - } - - @Override - protected void prepareWork() throws ActiveMQException { - super.prepareWork(); - this.producer = session.createProducer(queue); - } - - @Override - protected void doWork(int count) throws Exception { - - if (count < messages) { - ClientMessage m = createTextMessage(session, "msg" + count); - producer.send(m); - System.out.println("producer sent a message " + count); - } else { - stopRunning(); - } - } - } - - private class FixedRateConsumer extends FixedRateClient { - - Set consumers; - ClientConsumer consumer; - Set receivedMessages; - int id; - - FixedRateConsumer(ClientSessionFactory sf, SimpleString queue, - Set consumers, Set receivedMessages, - int id) throws ActiveMQException { - super(sf, queue); - this.consumers = consumers; - this.receivedMessages = receivedMessages; - this.id = id; - } - - @Override - protected void prepareWork() throws ActiveMQException { - super.prepareWork(); - this.consumer = session.createConsumer(queue); - this.session.start(); - } - - @Override - protected void doWork(int count) throws Exception { - ClientMessage m = this.consumer.receive(rate); - System.out.println("consumer " + id + " got m: " + m); - if (m != null) { - receivedMessages.add(m); - m.acknowledge(); - System.out.println("acked " + m.getClass().getName() + "now total received: " + receivedMessages.size()); - } - } - - @Override - protected void handleError(int count, Exception e) { - System.err.println("Got error receiving message " + count + " remove self " + this.id); - consumers.remove(this); - e.printStackTrace(); - } - - } - - private abstract class FixedRateClient extends Thread { - - protected ClientSessionFactory sf; - protected SimpleString queue; - protected ClientSession session; - protected int rate; - protected volatile boolean working; - - FixedRateClient(ClientSessionFactory sf, SimpleString queue) throws ActiveMQException { - this.sf = sf; - this.queue = queue; - } - - public void start(int rate) { - this.rate = rate; - working = true; - start(); - } - - protected void prepareWork() throws ActiveMQException { - this.session = addClientSession(sf.createSession(true, true)); - } - - @Override - public void run() { - try { - prepareWork(); - } catch (ActiveMQException e) { - System.out.println("got error in prepareWork(), aborting..."); - e.printStackTrace(); - return; - } - int count = 0; - while (working) { - try { - doWork(count); - Thread.sleep(rate); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e) { - System.err.println(this + " got exception "); - e.printStackTrace(); - handleError(count, e); - working = false; - } finally { - count++; - } - } - } - - protected abstract void doWork(int count) throws Exception; - - protected void handleError(int count, Exception e) { - } - - public void stopRunning() { - working = false; - interrupt(); - try { - join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } -} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java index 547577817e..88f7c72a59 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client; import java.util.Arrays; import java.util.Collection; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -42,7 +43,10 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.TimeUtils; +import org.jboss.logging.Logger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -52,6 +56,9 @@ import org.junit.runners.Parameterized; @RunWith(value = Parameterized.class) public class SlowConsumerTest extends ActiveMQTestBase { + private static final Logger logger = Logger.getLogger(SlowConsumerTest.class); + + int threshold = 10; private boolean isNetty = false; private boolean isPaging = false; @@ -82,7 +89,7 @@ public class SlowConsumerTest extends ActiveMQTestBase { AddressSettings addressSettings = new AddressSettings(); addressSettings.setSlowConsumerCheckPeriod(1); - addressSettings.setSlowConsumerThreshold(10); + addressSettings.setSlowConsumerThreshold(threshold); addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); if (isPaging) { @@ -347,4 +354,212 @@ public class SlowConsumerTest extends ActiveMQTestBase { assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED); } } + + /** + * This test creates 3 consumers on one queue. A producer sends + * messages at a rate of 2 mesages per second. Each consumer + * consumes messages at rate of 1 message per second. The slow + * consumer threshold is 1 message per second. + * Based on the above settings, at least one of the consumers + * will be removed during the test, but at least one of the + * consumers will remain and all messages will be received. + */ + @Test + public void testMultipleConsumersOneQueue() throws Exception { + locator.setAckBatchSize(0); + + Queue queue = server.locateQueue(QUEUE); + + ClientSessionFactory sf1 = createSessionFactory(locator); + ClientSessionFactory sf2 = createSessionFactory(locator); + ClientSessionFactory sf3 = createSessionFactory(locator); + ClientSessionFactory sf4 = createSessionFactory(locator); + + final int messages = 10 * threshold; + + FixedRateProducer producer = new FixedRateProducer(threshold * 2, sf1, QUEUE, messages); + + final Set consumers = new ConcurrentHashSet<>(); + final Set receivedMessages = new ConcurrentHashSet<>(); + + consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf2, QUEUE, 1)); + consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf3, QUEUE, 2)); + consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf4, QUEUE, 3)); + + try { + producer.start(); + + for (FixedRateConsumer consumer : consumers) { + consumer.start(); + } + + producer.join(10000); + + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages)); + + Assert.assertEquals(3, queue.getConsumerCount()); + + } finally { + producer.stopRunning(); + Assert.assertFalse(producer.failed); + for (FixedRateConsumer consumer : consumers) { + consumer.stopRunning(); + Assert.assertFalse(consumer.failed); + } + logger.debug("***report messages received: " + receivedMessages.size()); + logger.debug("***consumers left: " + consumers.size()); + } + } + + private class FixedRateProducer extends FixedRateClient { + + int messages; + ClientProducer producer; + + FixedRateProducer(int rate, ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException { + super(sf, queue, rate); + this.messages = messages; + } + + @Override + protected void prepareWork() throws ActiveMQException { + super.prepareWork(); + this.producer = session.createProducer(queue); + } + + @Override + protected void doWork(int count) throws Exception { + + if (count < messages) { + ClientMessage m = createTextMessage(session, "msg" + count); + producer.send(m); + logger.debug("producer sent a message " + count); + } else { + this.working = false; + } + } + + @Override + public String toString() { + return "Producer"; + } + } + + private class FixedRateConsumer extends FixedRateClient { + + Set consumers; + ClientConsumer consumer; + final Set receivedMessages; + int id; + + FixedRateConsumer(int rate, + Set receivedMessages, + ClientSessionFactory sf, + SimpleString queue, + int id) throws ActiveMQException { + super(sf, queue, rate); + this.id = id; + this.receivedMessages = receivedMessages; + } + + @Override + protected void prepareWork() throws ActiveMQException { + super.prepareWork(); + this.consumer = session.createConsumer(queue); + this.session.start(); + } + + @Override + protected void doWork(int count) throws Exception { + ClientMessage m = this.consumer.receive(1000); + logger.debug("consumer " + id + " got m: " + m); + if (m != null) { + receivedMessages.add(m); + m.acknowledge(); + logger.debug(" consumer " + id + " acked " + m.getClass().getName() + "now total received: " + receivedMessages.size()); + } + } + + @Override + protected void handleError(int count, Exception e) { + failed = true; + System.err.println("Got error receiving message " + count + " remove self " + this.id); + e.printStackTrace(); + } + + @Override + public String toString() { + return "Consumer " + id; + } + + } + + private abstract class FixedRateClient extends Thread { + + protected ClientSessionFactory sf; + protected SimpleString queue; + protected ClientSession session; + protected final int sleepTime; + protected volatile boolean working; + boolean failed; + + FixedRateClient(ClientSessionFactory sf, SimpleString queue, int rate) throws ActiveMQException { + this.sf = sf; + this.queue = queue; + this.sleepTime = 1000 / rate; + } + + protected void prepareWork() throws ActiveMQException { + this.session = addClientSession(sf.createSession(true, true)); + } + + @Override + public void run() { + working = true; + try { + prepareWork(); + } catch (ActiveMQException e) { + logger.debug("got error in prepareWork(), aborting..."); + e.printStackTrace(); + return; + } + int count = 0; + while (working) { + try { + doWork(count); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + // expected, nothing to be done + } catch (Exception e) { + failed = true; + handleError(count, e); + working = false; + } finally { + count++; + } + } + } + + protected abstract void doWork(int count) throws Exception; + + protected void handleError(int count, Exception e) { + } + + public void stopRunning() { + working = false; + try { + session.close(); + this.interrupt(); + join(5000); + if (isAlive()) { + fail("Interrupt is not working on Working Thread"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + handleError(0, e); + } + } + } + }