ARTEMIS-921 Fixing Slow Consumer when multiple consumers on same queue
This commit is contained in:
parent
90cf239829
commit
599aaa5345
|
@ -3132,7 +3132,20 @@ public class QueueImpl implements Queue {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
|
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
|
||||||
}
|
}
|
||||||
for (Consumer consumer : getConsumers()) {
|
|
||||||
|
Set<Consumer> consumersSet = getConsumers();
|
||||||
|
|
||||||
|
if (consumersSet.size() == 0) {
|
||||||
|
logger.debug("There are no consumers, no need to check slow consumer's rate");
|
||||||
|
return;
|
||||||
|
} else if (queueRate < (threshold * consumersSet.size())) {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Consumer consumer : consumersSet) {
|
||||||
if (consumer instanceof ServerConsumerImpl) {
|
if (consumer instanceof ServerConsumerImpl) {
|
||||||
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
|
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
|
||||||
float consumerRate = serverConsumer.getRate();
|
float consumerRate = serverConsumer.getRate();
|
||||||
|
@ -3159,9 +3172,6 @@ public class QueueImpl implements Queue {
|
||||||
connection.killMessage(server.getNodeID());
|
connection.killMessage(server.getNodeID());
|
||||||
remotingService.removeConnection(connection.getID());
|
remotingService.removeConnection(connection.getID());
|
||||||
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
|
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
|
||||||
//break once a consumer gets killed. This can prevent all
|
|
||||||
//consumers to this queue get killed all at once.
|
|
||||||
break;
|
|
||||||
} else if (policy.equals(SlowConsumerPolicy.NOTIFY)) {
|
} else if (policy.equals(SlowConsumerPolicy.NOTIFY)) {
|
||||||
TypedProperties props = new TypedProperties();
|
TypedProperties props = new TypedProperties();
|
||||||
|
|
||||||
|
|
|
@ -1,261 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.tests.integration.client;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|
||||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|
||||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
|
||||||
import org.apache.activemq.artemis.utils.TimeUtils;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
public class MultipleSlowConsumerTest extends ActiveMQTestBase {
|
|
||||||
|
|
||||||
private int checkPeriod = 3;
|
|
||||||
private int threshold = 1;
|
|
||||||
|
|
||||||
private ActiveMQServer server;
|
|
||||||
|
|
||||||
private final SimpleString QUEUE = new SimpleString("SlowConsumerTestQueue");
|
|
||||||
|
|
||||||
private ServerLocator locator;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
@Override
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
super.setUp();
|
|
||||||
|
|
||||||
server = createServer(true, true);
|
|
||||||
|
|
||||||
AddressSettings addressSettings = new AddressSettings();
|
|
||||||
addressSettings.setSlowConsumerCheckPeriod(checkPeriod);
|
|
||||||
addressSettings.setSlowConsumerThreshold(threshold);
|
|
||||||
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
|
|
||||||
|
|
||||||
server.start();
|
|
||||||
|
|
||||||
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
|
|
||||||
|
|
||||||
server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
|
|
||||||
|
|
||||||
locator = createFactory(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This test creates 3 consumers on one queue. A producer sends
|
|
||||||
* messages at a rate of 2 mesages per second. Each consumer
|
|
||||||
* consumes messages at rate of 1 message per second. The slow
|
|
||||||
* consumer threshold is 1 message per second.
|
|
||||||
* Based on the above settings, at least one of the consumers
|
|
||||||
* will be removed during the test, but at least one of the
|
|
||||||
* consumers will remain and all messages will be received.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testMultipleConsumersOneQueue() throws Exception {
|
|
||||||
locator.setAckBatchSize(0);
|
|
||||||
|
|
||||||
ClientSessionFactory sf1 = createSessionFactory(locator);
|
|
||||||
ClientSessionFactory sf2 = createSessionFactory(locator);
|
|
||||||
ClientSessionFactory sf3 = createSessionFactory(locator);
|
|
||||||
ClientSessionFactory sf4 = createSessionFactory(locator);
|
|
||||||
|
|
||||||
final int messages = 10;
|
|
||||||
|
|
||||||
FixedRateProducer producer = new FixedRateProducer(sf1, QUEUE, messages);
|
|
||||||
|
|
||||||
final Set<FixedRateConsumer> consumers = new ConcurrentHashSet<>();
|
|
||||||
final Set<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
|
|
||||||
|
|
||||||
consumers.add(new FixedRateConsumer(sf2, QUEUE, consumers, receivedMessages, 1));
|
|
||||||
consumers.add(new FixedRateConsumer(sf3, QUEUE, consumers, receivedMessages, 2));
|
|
||||||
consumers.add(new FixedRateConsumer(sf4, QUEUE, consumers, receivedMessages, 3));
|
|
||||||
|
|
||||||
try {
|
|
||||||
producer.start(threshold * 1000 / 2);
|
|
||||||
|
|
||||||
for (FixedRateConsumer consumer : consumers) {
|
|
||||||
consumer.start(threshold * 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
//check at least one consumer is killed
|
|
||||||
//but at least one survived
|
|
||||||
//and all messages are received.
|
|
||||||
assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() < 3));
|
|
||||||
assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0));
|
|
||||||
assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages));
|
|
||||||
assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0));
|
|
||||||
} finally {
|
|
||||||
producer.stopRunning();
|
|
||||||
for (FixedRateConsumer consumer : consumers) {
|
|
||||||
consumer.stopRunning();
|
|
||||||
}
|
|
||||||
System.out.println("***report messages received: " + receivedMessages.size());
|
|
||||||
System.out.println("***consumers left: " + consumers.size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class FixedRateProducer extends FixedRateClient {
|
|
||||||
|
|
||||||
int messages;
|
|
||||||
ClientProducer producer;
|
|
||||||
|
|
||||||
FixedRateProducer(ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException {
|
|
||||||
super(sf, queue);
|
|
||||||
this.messages = messages;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void prepareWork() throws ActiveMQException {
|
|
||||||
super.prepareWork();
|
|
||||||
this.producer = session.createProducer(queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doWork(int count) throws Exception {
|
|
||||||
|
|
||||||
if (count < messages) {
|
|
||||||
ClientMessage m = createTextMessage(session, "msg" + count);
|
|
||||||
producer.send(m);
|
|
||||||
System.out.println("producer sent a message " + count);
|
|
||||||
} else {
|
|
||||||
stopRunning();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class FixedRateConsumer extends FixedRateClient {
|
|
||||||
|
|
||||||
Set<FixedRateConsumer> consumers;
|
|
||||||
ClientConsumer consumer;
|
|
||||||
Set<ClientMessage> receivedMessages;
|
|
||||||
int id;
|
|
||||||
|
|
||||||
FixedRateConsumer(ClientSessionFactory sf, SimpleString queue,
|
|
||||||
Set<FixedRateConsumer> consumers, Set<ClientMessage> receivedMessages,
|
|
||||||
int id) throws ActiveMQException {
|
|
||||||
super(sf, queue);
|
|
||||||
this.consumers = consumers;
|
|
||||||
this.receivedMessages = receivedMessages;
|
|
||||||
this.id = id;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void prepareWork() throws ActiveMQException {
|
|
||||||
super.prepareWork();
|
|
||||||
this.consumer = session.createConsumer(queue);
|
|
||||||
this.session.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doWork(int count) throws Exception {
|
|
||||||
ClientMessage m = this.consumer.receive(rate);
|
|
||||||
System.out.println("consumer " + id + " got m: " + m);
|
|
||||||
if (m != null) {
|
|
||||||
receivedMessages.add(m);
|
|
||||||
m.acknowledge();
|
|
||||||
System.out.println("acked " + m.getClass().getName() + "now total received: " + receivedMessages.size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void handleError(int count, Exception e) {
|
|
||||||
System.err.println("Got error receiving message " + count + " remove self " + this.id);
|
|
||||||
consumers.remove(this);
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private abstract class FixedRateClient extends Thread {
|
|
||||||
|
|
||||||
protected ClientSessionFactory sf;
|
|
||||||
protected SimpleString queue;
|
|
||||||
protected ClientSession session;
|
|
||||||
protected int rate;
|
|
||||||
protected volatile boolean working;
|
|
||||||
|
|
||||||
FixedRateClient(ClientSessionFactory sf, SimpleString queue) throws ActiveMQException {
|
|
||||||
this.sf = sf;
|
|
||||||
this.queue = queue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start(int rate) {
|
|
||||||
this.rate = rate;
|
|
||||||
working = true;
|
|
||||||
start();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void prepareWork() throws ActiveMQException {
|
|
||||||
this.session = addClientSession(sf.createSession(true, true));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
prepareWork();
|
|
||||||
} catch (ActiveMQException e) {
|
|
||||||
System.out.println("got error in prepareWork(), aborting...");
|
|
||||||
e.printStackTrace();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
int count = 0;
|
|
||||||
while (working) {
|
|
||||||
try {
|
|
||||||
doWork(count);
|
|
||||||
Thread.sleep(rate);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
} catch (Exception e) {
|
|
||||||
System.err.println(this + " got exception ");
|
|
||||||
e.printStackTrace();
|
|
||||||
handleError(count, e);
|
|
||||||
working = false;
|
|
||||||
} finally {
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract void doWork(int count) throws Exception;
|
|
||||||
|
|
||||||
protected void handleError(int count, Exception e) {
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stopRunning() {
|
|
||||||
working = false;
|
|
||||||
interrupt();
|
|
||||||
try {
|
|
||||||
join();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -42,7 +43,10 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
|
import org.apache.activemq.artemis.utils.TimeUtils;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -52,6 +56,9 @@ import org.junit.runners.Parameterized;
|
||||||
@RunWith(value = Parameterized.class)
|
@RunWith(value = Parameterized.class)
|
||||||
public class SlowConsumerTest extends ActiveMQTestBase {
|
public class SlowConsumerTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(SlowConsumerTest.class);
|
||||||
|
|
||||||
|
int threshold = 10;
|
||||||
private boolean isNetty = false;
|
private boolean isNetty = false;
|
||||||
private boolean isPaging = false;
|
private boolean isPaging = false;
|
||||||
|
|
||||||
|
@ -82,7 +89,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
AddressSettings addressSettings = new AddressSettings();
|
AddressSettings addressSettings = new AddressSettings();
|
||||||
addressSettings.setSlowConsumerCheckPeriod(1);
|
addressSettings.setSlowConsumerCheckPeriod(1);
|
||||||
addressSettings.setSlowConsumerThreshold(10);
|
addressSettings.setSlowConsumerThreshold(threshold);
|
||||||
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
|
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
|
||||||
|
|
||||||
if (isPaging) {
|
if (isPaging) {
|
||||||
|
@ -347,4 +354,212 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
||||||
assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED);
|
assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test creates 3 consumers on one queue. A producer sends
|
||||||
|
* messages at a rate of 2 mesages per second. Each consumer
|
||||||
|
* consumes messages at rate of 1 message per second. The slow
|
||||||
|
* consumer threshold is 1 message per second.
|
||||||
|
* Based on the above settings, at least one of the consumers
|
||||||
|
* will be removed during the test, but at least one of the
|
||||||
|
* consumers will remain and all messages will be received.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMultipleConsumersOneQueue() throws Exception {
|
||||||
|
locator.setAckBatchSize(0);
|
||||||
|
|
||||||
|
Queue queue = server.locateQueue(QUEUE);
|
||||||
|
|
||||||
|
ClientSessionFactory sf1 = createSessionFactory(locator);
|
||||||
|
ClientSessionFactory sf2 = createSessionFactory(locator);
|
||||||
|
ClientSessionFactory sf3 = createSessionFactory(locator);
|
||||||
|
ClientSessionFactory sf4 = createSessionFactory(locator);
|
||||||
|
|
||||||
|
final int messages = 10 * threshold;
|
||||||
|
|
||||||
|
FixedRateProducer producer = new FixedRateProducer(threshold * 2, sf1, QUEUE, messages);
|
||||||
|
|
||||||
|
final Set<FixedRateConsumer> consumers = new ConcurrentHashSet<>();
|
||||||
|
final Set<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
|
||||||
|
|
||||||
|
consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf2, QUEUE, 1));
|
||||||
|
consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf3, QUEUE, 2));
|
||||||
|
consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf4, QUEUE, 3));
|
||||||
|
|
||||||
|
try {
|
||||||
|
producer.start();
|
||||||
|
|
||||||
|
for (FixedRateConsumer consumer : consumers) {
|
||||||
|
consumer.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
producer.join(10000);
|
||||||
|
|
||||||
|
assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages));
|
||||||
|
|
||||||
|
Assert.assertEquals(3, queue.getConsumerCount());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
producer.stopRunning();
|
||||||
|
Assert.assertFalse(producer.failed);
|
||||||
|
for (FixedRateConsumer consumer : consumers) {
|
||||||
|
consumer.stopRunning();
|
||||||
|
Assert.assertFalse(consumer.failed);
|
||||||
|
}
|
||||||
|
logger.debug("***report messages received: " + receivedMessages.size());
|
||||||
|
logger.debug("***consumers left: " + consumers.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class FixedRateProducer extends FixedRateClient {
|
||||||
|
|
||||||
|
int messages;
|
||||||
|
ClientProducer producer;
|
||||||
|
|
||||||
|
FixedRateProducer(int rate, ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException {
|
||||||
|
super(sf, queue, rate);
|
||||||
|
this.messages = messages;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void prepareWork() throws ActiveMQException {
|
||||||
|
super.prepareWork();
|
||||||
|
this.producer = session.createProducer(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doWork(int count) throws Exception {
|
||||||
|
|
||||||
|
if (count < messages) {
|
||||||
|
ClientMessage m = createTextMessage(session, "msg" + count);
|
||||||
|
producer.send(m);
|
||||||
|
logger.debug("producer sent a message " + count);
|
||||||
|
} else {
|
||||||
|
this.working = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Producer";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class FixedRateConsumer extends FixedRateClient {
|
||||||
|
|
||||||
|
Set<FixedRateConsumer> consumers;
|
||||||
|
ClientConsumer consumer;
|
||||||
|
final Set<ClientMessage> receivedMessages;
|
||||||
|
int id;
|
||||||
|
|
||||||
|
FixedRateConsumer(int rate,
|
||||||
|
Set<ClientMessage> receivedMessages,
|
||||||
|
ClientSessionFactory sf,
|
||||||
|
SimpleString queue,
|
||||||
|
int id) throws ActiveMQException {
|
||||||
|
super(sf, queue, rate);
|
||||||
|
this.id = id;
|
||||||
|
this.receivedMessages = receivedMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void prepareWork() throws ActiveMQException {
|
||||||
|
super.prepareWork();
|
||||||
|
this.consumer = session.createConsumer(queue);
|
||||||
|
this.session.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doWork(int count) throws Exception {
|
||||||
|
ClientMessage m = this.consumer.receive(1000);
|
||||||
|
logger.debug("consumer " + id + " got m: " + m);
|
||||||
|
if (m != null) {
|
||||||
|
receivedMessages.add(m);
|
||||||
|
m.acknowledge();
|
||||||
|
logger.debug(" consumer " + id + " acked " + m.getClass().getName() + "now total received: " + receivedMessages.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void handleError(int count, Exception e) {
|
||||||
|
failed = true;
|
||||||
|
System.err.println("Got error receiving message " + count + " remove self " + this.id);
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Consumer " + id;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private abstract class FixedRateClient extends Thread {
|
||||||
|
|
||||||
|
protected ClientSessionFactory sf;
|
||||||
|
protected SimpleString queue;
|
||||||
|
protected ClientSession session;
|
||||||
|
protected final int sleepTime;
|
||||||
|
protected volatile boolean working;
|
||||||
|
boolean failed;
|
||||||
|
|
||||||
|
FixedRateClient(ClientSessionFactory sf, SimpleString queue, int rate) throws ActiveMQException {
|
||||||
|
this.sf = sf;
|
||||||
|
this.queue = queue;
|
||||||
|
this.sleepTime = 1000 / rate;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void prepareWork() throws ActiveMQException {
|
||||||
|
this.session = addClientSession(sf.createSession(true, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
working = true;
|
||||||
|
try {
|
||||||
|
prepareWork();
|
||||||
|
} catch (ActiveMQException e) {
|
||||||
|
logger.debug("got error in prepareWork(), aborting...");
|
||||||
|
e.printStackTrace();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int count = 0;
|
||||||
|
while (working) {
|
||||||
|
try {
|
||||||
|
doWork(count);
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// expected, nothing to be done
|
||||||
|
} catch (Exception e) {
|
||||||
|
failed = true;
|
||||||
|
handleError(count, e);
|
||||||
|
working = false;
|
||||||
|
} finally {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void doWork(int count) throws Exception;
|
||||||
|
|
||||||
|
protected void handleError(int count, Exception e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stopRunning() {
|
||||||
|
working = false;
|
||||||
|
try {
|
||||||
|
session.close();
|
||||||
|
this.interrupt();
|
||||||
|
join(5000);
|
||||||
|
if (isAlive()) {
|
||||||
|
fail("Interrupt is not working on Working Thread");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (Exception e) {
|
||||||
|
handleError(0, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue