From 9bf9e1c052e3e0cf080a0bc851f2b537b5d6a530 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 21 Mar 2014 12:21:19 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5114 - force local removeInfo after connection controll consumer remove command so consumer is always removed from the broker, independent of the consumer state --- .../policy/AbortSlowConsumerStrategy.java | 31 ++++- .../policy/AbortSlowAckConsumer0Test.java | 7 +- .../policy/AbortSlowAckConsumer1Test.java | 8 +- .../policy/AbortSlowAckConsumer2Test.java | 8 +- .../broker/policy/AbortSlowConsumer0Test.java | 123 ++++++++++++++---- .../broker/policy/AbortSlowConsumer1Test.java | 22 +--- .../broker/policy/AbortSlowConsumer2Test.java | 29 +---- 7 files changed, 147 insertions(+), 81 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java index 867d2b246d..fe6ba44e88 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java @@ -30,6 +30,8 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transport.InactivityIOException; import org.slf4j.Logger; @@ -161,19 +163,44 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable connection.getConnectionId(), subscriptions.size()); } } else { - // just abort each consumer by telling it to stop + // just abort each consumer for (Subscription subscription : subscriptions) { + final Subscription subToClose = subscription; LOG.info("aborting slow consumer: {} for destination:{}", subscription.getConsumerInfo().getConsumerId(), subscription.getActiveMQDestination()); + // tell the remote consumer to close try { ConsumerControl stopConsumer = new ConsumerControl(); stopConsumer.setConsumerId(subscription.getConsumerInfo().getConsumerId()); stopConsumer.setClose(true); connection.dispatchAsync(stopConsumer); } catch (Exception e) { - LOG.info("exception on aborting slow consumer: {}", subscription.getConsumerInfo().getConsumerId()); + LOG.info("exception on aborting slow consumer: {}", subscription.getConsumerInfo().getConsumerId(), e); + } + + // force a local remove in case remote is unresponsive + try { + scheduler.executeAfterDelay(new Runnable() { + @Override + public void run() { + try { + RemoveInfo removeCommand = subToClose.getConsumerInfo().createRemoveCommand(); + if (connection instanceof CommandVisitor) { + // avoid service exception handling and logging + removeCommand.visit((CommandVisitor) connection); + } else { + connection.service(removeCommand); + } + } catch (IllegalStateException ignoredAsRemoteHasDoneTheJob) { + } catch (Exception e) { + LOG.info("exception on local remove of slow consumer: {}", subToClose.getConsumerInfo().getConsumerId(), e); + } + }}, 1000l); + + } catch (Exception e) { + LOG.info("exception on local remove of slow consumer: {}", subscription.getConsumerInfo().getConsumerId(), e); } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java index fa14142af3..3cfd59545d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java @@ -35,16 +35,21 @@ import org.apache.activemq.broker.region.policy.PolicyMap; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@RunWith(value = BlockJUnit4ClassRunner.class) +@RunWith(value = Parameterized.class) public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test { private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class); protected long maxTimeSinceLastAck = 5 * 1000; AbortSlowAckConsumerStrategy strategy; + public AbortSlowAckConsumer0Test(Boolean isTopic) { + super(isTopic); + } + @Override protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() { AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java index 8812a6bb9a..6d3e970c78 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.policy; +import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; @@ -23,18 +24,11 @@ import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.junit.runner.RunWith; -import org.junit.runners.BlockJUnit4ClassRunner; import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.ConnectionFactory; @RunWith(value = Parameterized.class) public class AbortSlowAckConsumer1Test extends AbortSlowConsumer1Test { - private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer1Test.class); - protected long maxTimeSinceLastAck = 5 * 1000; public AbortSlowAckConsumer1Test(Boolean abortConnection, Boolean topic) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java index 63c97735d7..948613e0c8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.policy; +import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; @@ -23,18 +24,11 @@ import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.junit.runner.RunWith; -import org.junit.runners.BlockJUnit4ClassRunner; import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.ConnectionFactory; @RunWith(value = Parameterized.class) public class AbortSlowAckConsumer2Test extends AbortSlowConsumer2Test { - private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer2Test.class); - protected long maxTimeSinceLastAck = 5 * 1000; public AbortSlowAckConsumer2Test(Boolean topic) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java index 373b155b5d..9f2344350d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java @@ -16,25 +16,13 @@ */ package org.apache.activemq.broker.policy; -import org.apache.activemq.JmsMultipleClientsTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; -import org.apache.activemq.broker.jmx.DestinationViewMBean; -import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.util.MessageIdList; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.BlockJUnit4ClassRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; -import javax.jms.ExceptionListener; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; @@ -42,20 +30,43 @@ import javax.management.InstanceNotFoundException; import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; -import java.lang.reflect.UndeclaredThrowableException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.util.SocketProxy; +import org.apache.activemq.util.Wait; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.junit.Assert.*; -@RunWith(BlockJUnit4ClassRunner.class) +@RunWith(value = Parameterized.class) public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer0Test.class); + @Parameterized.Parameters(name = "isTopic({0})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); + } + + public AbortSlowConsumer0Test(Boolean isTopic) { + this.topic = isTopic; + } + @Test public void testRegularConsumerIsNotAborted() throws Exception { startConsumers(destination); @@ -70,7 +81,7 @@ public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { @Test public void testSlowConsumerIsAbortedViaJmx() throws Exception { underTest.setMaxSlowDuration(60*1000); // so jmx does the abort - startConsumers(destination); + startConsumers(withPrefetch(2, destination)); Entry consumertoAbort = consumers.entrySet().iterator().next(); consumertoAbort.getValue().setProcessingDelay(8 * 1000); for (Connection c : connections) { @@ -123,6 +134,12 @@ public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { } } + private Destination withPrefetch(int i, Destination destination) { + String destWithPrefetch = + ((ActiveMQDestination) destination).getPhysicalName() + "?consumer.prefetchSize=" + i; + return topic ? new ActiveMQTopic(destWithPrefetch) : new ActiveMQQueue(destWithPrefetch); + } + @Test public void testOnlyOneSlowConsumerIsAborted() throws Exception { consumerCount = 10; @@ -145,7 +162,7 @@ public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { @Test public void testAbortAlreadyClosingConsumers() throws Exception { consumerCount = 1; - startConsumers(destination); + startConsumers(withPrefetch(2, destination)); for (MessageIdList list : consumers.values()) { list.setProcessingDelay(6 * 1000); } @@ -164,7 +181,59 @@ public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { @Test public void testAbortConsumerOnDeadConnection() throws Exception { - // socket proxy on pause, close could hang?? + TransportConnector transportConnector = broker.addConnector("tcp://0.0.0.0:0"); + transportConnector.setBrokerService(broker); + transportConnector.setTaskRunnerFactory(broker.getTaskRunnerFactory()); + transportConnector.start(); + SocketProxy socketProxy = new SocketProxy(transportConnector.getPublishableConnectURI()); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(socketProxy.getUrl()); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(4); + connectionFactory.setPrefetchPolicy(prefetchPolicy); + Connection c = connectionFactory.createConnection(); + connections.add(c); + c.start(); + Session session = c.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final ActiveMQMessageConsumer messageconsumer = (ActiveMQMessageConsumer) session.createConsumer(destination); + startProducers(destination, 10); + + messageconsumer.receive(4000).acknowledge(); + assertNotNull(messageconsumer.receive(4000)); + assertNotNull(messageconsumer.receive(4000)); + assertNotNull(messageconsumer.receive(4000)); + + // close control command won't get through + socketProxy.pause(); + + ActiveMQDestination amqDest = (ActiveMQDestination)destination; + ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" + + (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName=" + + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost"); + + final DestinationViewMBean destView = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true); + + assertTrue("Consumer gone from broker view", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("DestView {} comsumerCount {}", destView, destView.getConsumerCount()); + return 0 == destView.getConsumerCount(); + } + })); + + socketProxy.goOn(); + + assertTrue("consumer was closed", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + boolean closed = false; + try { + messageconsumer.receive(400); + } catch (javax.jms.IllegalStateException expected) { + closed = expected.toString().contains("closed"); + } + return closed; + } + })); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java index 6fe1e477ae..e17b362cf6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java @@ -26,9 +26,8 @@ import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.MessageConsumer; import javax.jms.Session; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.List; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; @@ -39,20 +38,13 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase { private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer1Test.class); - @Parameterized.Parameters(name = "{0}-{1}") + @Parameterized.Parameters(name = "abortConnection({0})-isTopic({1})") public static Collection getTestParameters() { - - List testParameters = new ArrayList(); - Boolean[] booleanValues = {Boolean.TRUE, Boolean.TRUE}; - for (Boolean abortConnection : booleanValues) { - for (Boolean topic : booleanValues) { - Boolean[] pair = {abortConnection, topic}; - LOG.info(">>>>> in getTestparameters, adding {}, {}", abortConnection, topic); - testParameters.add(pair); - } - } - - return testParameters; + return Arrays.asList(new Object[][]{ + {Boolean.TRUE, Boolean.TRUE}, + {Boolean.TRUE, Boolean.FALSE}, + {Boolean.FALSE, Boolean.TRUE}, + {Boolean.FALSE, Boolean.FALSE}}); } public AbortSlowConsumer1Test(Boolean abortConnection, Boolean topic) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java index 2cbea5bd4b..72630278cc 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java @@ -16,43 +16,28 @@ */ package org.apache.activemq.broker.policy; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map.Entry; +import javax.jms.Connection; +import javax.jms.MessageConsumer; import org.apache.activemq.util.MessageIdList; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map.Entry; @RunWith(value = Parameterized.class) public class AbortSlowConsumer2Test extends AbortSlowConsumerBase { - private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer2Test.class); - - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "isTopic({0})") public static Collection getTestParameters() { - - List testParameters = new ArrayList(); - Boolean[] booleanValues = {Boolean.TRUE, Boolean.FALSE}; - for (Boolean topic : booleanValues) { - Boolean[] value = {topic}; - testParameters.add(value); - } - - return testParameters; + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); } public AbortSlowConsumer2Test(Boolean isTopic) { this.topic = isTopic; } - @Test(timeout = 60 * 1000) public void testLittleSlowConsumerIsNotAborted() throws Exception { startConsumers(destination);