https://issues.apache.org/jira/browse/AMQ-5114 - force local removeInfo after connection controll consumer remove command so consumer is always removed from the broker, independent of the consumer state

This commit is contained in:
gtully 2014-03-21 12:21:19 +00:00
parent 9c67f0df71
commit 9bf9e1c052
7 changed files with 147 additions and 81 deletions

View File

@ -30,6 +30,8 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transport.InactivityIOException;
import org.slf4j.Logger;
@ -161,19 +163,44 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
connection.getConnectionId(), subscriptions.size());
}
} else {
// just abort each consumer by telling it to stop
// just abort each consumer
for (Subscription subscription : subscriptions) {
final Subscription subToClose = subscription;
LOG.info("aborting slow consumer: {} for destination:{}",
subscription.getConsumerInfo().getConsumerId(),
subscription.getActiveMQDestination());
// tell the remote consumer to close
try {
ConsumerControl stopConsumer = new ConsumerControl();
stopConsumer.setConsumerId(subscription.getConsumerInfo().getConsumerId());
stopConsumer.setClose(true);
connection.dispatchAsync(stopConsumer);
} catch (Exception e) {
LOG.info("exception on aborting slow consumer: {}", subscription.getConsumerInfo().getConsumerId());
LOG.info("exception on aborting slow consumer: {}", subscription.getConsumerInfo().getConsumerId(), e);
}
// force a local remove in case remote is unresponsive
try {
scheduler.executeAfterDelay(new Runnable() {
@Override
public void run() {
try {
RemoveInfo removeCommand = subToClose.getConsumerInfo().createRemoveCommand();
if (connection instanceof CommandVisitor) {
// avoid service exception handling and logging
removeCommand.visit((CommandVisitor) connection);
} else {
connection.service(removeCommand);
}
} catch (IllegalStateException ignoredAsRemoteHasDoneTheJob) {
} catch (Exception e) {
LOG.info("exception on local remove of slow consumer: {}", subToClose.getConsumerInfo().getConsumerId(), e);
}
}}, 1000l);
} catch (Exception e) {
LOG.info("exception on local remove of slow consumer: {}", subscription.getConsumerInfo().getConsumerId(), e);
}
}
}

View File

@ -35,16 +35,21 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(value = BlockJUnit4ClassRunner.class)
@RunWith(value = Parameterized.class)
public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class);
protected long maxTimeSinceLastAck = 5 * 1000;
AbortSlowAckConsumerStrategy strategy;
public AbortSlowAckConsumer0Test(Boolean isTopic) {
super(isTopic);
}
@Override
protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.policy;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
@ -23,18 +24,11 @@ import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.ConnectionFactory;
@RunWith(value = Parameterized.class)
public class AbortSlowAckConsumer1Test extends AbortSlowConsumer1Test {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer1Test.class);
protected long maxTimeSinceLastAck = 5 * 1000;
public AbortSlowAckConsumer1Test(Boolean abortConnection, Boolean topic) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.policy;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
@ -23,18 +24,11 @@ import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.ConnectionFactory;
@RunWith(value = Parameterized.class)
public class AbortSlowAckConsumer2Test extends AbortSlowConsumer2Test {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer2Test.class);
protected long maxTimeSinceLastAck = 5 * 1000;
public AbortSlowAckConsumer2Test(Boolean topic) {

View File

@ -16,25 +16,13 @@
*/
package org.apache.activemq.broker.policy;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
@ -42,20 +30,43 @@ import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
@RunWith(BlockJUnit4ClassRunner.class)
@RunWith(value = Parameterized.class)
public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer0Test.class);
@Parameterized.Parameters(name = "isTopic({0})")
public static Collection<Object[]> getTestParameters() {
return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
}
public AbortSlowConsumer0Test(Boolean isTopic) {
this.topic = isTopic;
}
@Test
public void testRegularConsumerIsNotAborted() throws Exception {
startConsumers(destination);
@ -70,7 +81,7 @@ public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
@Test
public void testSlowConsumerIsAbortedViaJmx() throws Exception {
underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
startConsumers(destination);
startConsumers(withPrefetch(2, destination));
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(8 * 1000);
for (Connection c : connections) {
@ -123,6 +134,12 @@ public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
}
}
private Destination withPrefetch(int i, Destination destination) {
String destWithPrefetch =
((ActiveMQDestination) destination).getPhysicalName() + "?consumer.prefetchSize=" + i;
return topic ? new ActiveMQTopic(destWithPrefetch) : new ActiveMQQueue(destWithPrefetch);
}
@Test
public void testOnlyOneSlowConsumerIsAborted() throws Exception {
consumerCount = 10;
@ -145,7 +162,7 @@ public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
@Test
public void testAbortAlreadyClosingConsumers() throws Exception {
consumerCount = 1;
startConsumers(destination);
startConsumers(withPrefetch(2, destination));
for (MessageIdList list : consumers.values()) {
list.setProcessingDelay(6 * 1000);
}
@ -164,7 +181,59 @@ public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {
@Test
public void testAbortConsumerOnDeadConnection() throws Exception {
// socket proxy on pause, close could hang??
TransportConnector transportConnector = broker.addConnector("tcp://0.0.0.0:0");
transportConnector.setBrokerService(broker);
transportConnector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
transportConnector.start();
SocketProxy socketProxy = new SocketProxy(transportConnector.getPublishableConnectURI());
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(socketProxy.getUrl());
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(4);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
Connection c = connectionFactory.createConnection();
connections.add(c);
c.start();
Session session = c.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final ActiveMQMessageConsumer messageconsumer = (ActiveMQMessageConsumer) session.createConsumer(destination);
startProducers(destination, 10);
messageconsumer.receive(4000).acknowledge();
assertNotNull(messageconsumer.receive(4000));
assertNotNull(messageconsumer.receive(4000));
assertNotNull(messageconsumer.receive(4000));
// close control command won't get through
socketProxy.pause();
ActiveMQDestination amqDest = (ActiveMQDestination)destination;
ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" +
(amqDest.isTopic() ? "Topic" : "Queue") +",destinationName="
+ amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
final DestinationViewMBean destView = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
assertTrue("Consumer gone from broker view", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("DestView {} comsumerCount {}", destView, destView.getConsumerCount());
return 0 == destView.getConsumerCount();
}
}));
socketProxy.goOn();
assertTrue("consumer was closed", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
boolean closed = false;
try {
messageconsumer.receive(400);
} catch (javax.jms.IllegalStateException expected) {
closed = expected.toString().contains("closed");
}
return closed;
}
}));
}
@Override

View File

@ -26,9 +26,8 @@ import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
@ -39,20 +38,13 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer1Test.class);
@Parameterized.Parameters(name = "{0}-{1}")
@Parameterized.Parameters(name = "abortConnection({0})-isTopic({1})")
public static Collection<Object[]> getTestParameters() {
List<Object[]> testParameters = new ArrayList<Object[]>();
Boolean[] booleanValues = {Boolean.TRUE, Boolean.TRUE};
for (Boolean abortConnection : booleanValues) {
for (Boolean topic : booleanValues) {
Boolean[] pair = {abortConnection, topic};
LOG.info(">>>>> in getTestparameters, adding {}, {}", abortConnection, topic);
testParameters.add(pair);
}
}
return testParameters;
return Arrays.asList(new Object[][]{
{Boolean.TRUE, Boolean.TRUE},
{Boolean.TRUE, Boolean.FALSE},
{Boolean.FALSE, Boolean.TRUE},
{Boolean.FALSE, Boolean.FALSE}});
}
public AbortSlowConsumer1Test(Boolean abortConnection, Boolean topic) {

View File

@ -16,43 +16,28 @@
*/
package org.apache.activemq.broker.policy;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map.Entry;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import org.apache.activemq.util.MessageIdList;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
@RunWith(value = Parameterized.class)
public class AbortSlowConsumer2Test extends AbortSlowConsumerBase {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer2Test.class);
@Parameterized.Parameters(name = "{0}")
@Parameterized.Parameters(name = "isTopic({0})")
public static Collection<Object[]> getTestParameters() {
List<Object[]> testParameters = new ArrayList<Object[]>();
Boolean[] booleanValues = {Boolean.TRUE, Boolean.FALSE};
for (Boolean topic : booleanValues) {
Boolean[] value = {topic};
testParameters.add(value);
}
return testParameters;
return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
}
public AbortSlowConsumer2Test(Boolean isTopic) {
this.topic = isTopic;
}
@Test(timeout = 60 * 1000)
public void testLittleSlowConsumerIsNotAborted() throws Exception {
startConsumers(destination);