ARTEMIS-1902 Ensure ServerConsumer close done once

Calling close multiple times on ServerConsumer can result in multiple
notifications being routed around the cluster.  This causes cluster
topology info to become skewed.  Which affects a number of components
such as message redistribution, metrics and can eventually cause OOM
should multiple queues be redistributing at the same time.
This commit is contained in:
Martyn Taylor 2018-06-01 16:33:18 +01:00 committed by Clebert Suconic
parent b3354a0cbe
commit dde60b136a
5 changed files with 108 additions and 22 deletions

View File

@ -18,9 +18,11 @@ package org.apache.activemq.artemis.core.postoffice;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
public class QueueInfo implements Serializable {
@ -38,7 +40,9 @@ public class QueueInfo implements Serializable {
private List<SimpleString> filterStrings;
private int numberOfConsumers;
private volatile int consumersCount = 0;
private static final AtomicIntegerFieldUpdater<QueueInfo> consumerUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueInfo.class, "consumersCount");
private final int distance;
@ -99,15 +103,23 @@ public class QueueInfo implements Serializable {
}
public int getNumberOfConsumers() {
return numberOfConsumers;
return consumerUpdater.get(this);
}
public void incrementConsumers() {
numberOfConsumers++;
consumerUpdater.incrementAndGet(this);
}
public void decrementConsumers() {
numberOfConsumers--;
consumerUpdater.getAndUpdate(this, value -> {
if (value > 0) {
return value--;
} else {
ActiveMQServerLogger.LOGGER.consumerCountError("Tried to decrement consumer count below 0: " + this);
return value;
}
});
}
public boolean matchesAddress(SimpleString address) {
@ -144,7 +156,7 @@ public class QueueInfo implements Serializable {
", filterStrings=" +
filterStrings +
", numberOfConsumers=" +
numberOfConsumers +
consumersCount +
", distance=" +
distance +
"]";

View File

@ -376,7 +376,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
filterStrings.remove(filterString);
}
if (info.getNumberOfConsumers() == 0) {
// The consumer count should never be < 0 but we should catch here just in case.
if (info.getNumberOfConsumers() <= 0) {
if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
logger.debug("PostOffice notification / CONSUMER_CLOSED: HDR_DISTANCE not defined");
return;

View File

@ -1946,4 +1946,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.TRACE)
@Message(id = 224094, value = "Quorum vote result await is interrupted", format = Message.Format.MESSAGE_FORMAT)
void quorumVoteAwaitInterrupted();
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT)
void consumerCountError(String reason);
}

View File

@ -157,6 +157,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private boolean anycast = false;
private boolean isClosed = false;
// Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
@ -477,7 +479,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
public void close(final boolean failed) throws Exception {
public synchronized void close(final boolean failed) throws Exception {
// Close should only ever be done once per consumer.
if (isClosed) return;
isClosed = true;
if (logger.isTraceEnabled()) {
logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
}

View File

@ -29,6 +29,7 @@ import java.util.Collection;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
@ -51,15 +52,13 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
private static final int NUMBER_OF_SERVERS = 2;
private static final SimpleString queueName = SimpleString.toSimpleString("queues.0");
// I'm taking any number that /2 = Odd
// to avoid perfect roundings and making sure messages are evenly distributed
private static final int NUMBER_OF_MESSAGES = 77 * 2;
@Parameterized.Parameters(name = "protocol={0}")
public static Collection getParameters() {
return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}});
return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}, {"OPENWIRE"}});
}
@Parameterized.Parameter(0)
@ -103,14 +102,19 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
private ConnectionFactory getJmsConnectionFactory(int node) {
if (protocol.equals("AMQP")) {
return new JmsConnectionFactory("amqp://localhost:" + (61616 + node));
} else {
} else if (protocol.equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
} else if (protocol.equals("CORE")) {
return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
} else {
Assert.fail("Protocol " + protocol + " unkown");
return null;
}
}
private void pauseClusteringBridges(ActiveMQServer server) throws Exception {
for (ClusterConnection clusterConnection : server.getClusterManager().getClusterConnections()) {
for (MessageFlowRecord record : ((ClusterConnectionImpl)clusterConnection).getRecords().values()) {
for (MessageFlowRecord record : ((ClusterConnectionImpl) clusterConnection).getRecords().values()) {
record.getBridge().pause();
}
}
@ -123,8 +127,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS];
Connection[] connection = new Connection[NUMBER_OF_SERVERS];
Session[] session = new Session[NUMBER_OF_SERVERS];
MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS];
Session[] session = new Session[NUMBER_OF_SERVERS];
MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS];
// this will pre create consumers to make sure messages are distributed evenly without redistribution
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
@ -142,10 +146,9 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
pauseClusteringBridges(servers[0]);
// sending Messages.. they should be load balanced
{
ConnectionFactory cf = getJmsConnectionFactory(0);
ConnectionFactory cf = getJmsConnectionFactory(0);
Connection cn = cf.createConnection();
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
@ -180,12 +183,72 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
}
@Test
public void testExpireRedistributed() throws Exception {
public void testRedistributionStoppedWithNoRemoteConsumers() throws Exception {
startServers(MessageLoadBalancingType.ON_DEMAND);
ConnectionFactory factory = getJmsConnectionFactory(1);
// Wait for cluster nodes to sync
waitForBindings(0, "queues.0", 1, 0, true);
waitForBindings(1, "queues.0", 1, 0, true);
waitForBindings(0, "queues.0", 1, 0, false);
waitForBindings(1, "queues.0", 1, 0, false);
// Create CFs
ConnectionFactory cf0 = getJmsConnectionFactory(0);
ConnectionFactory cf1 = getJmsConnectionFactory(1);
// Create Consumers
Connection cn0 = cf0.createConnection();
Session sn0 = cn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c0 = sn0.createConsumer(sn0.createQueue(queueName.toString()));
cn0.start();
Connection cn1 = cf1.createConnection();
Session sn1 = cn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c1 = sn1.createConsumer(sn0.createQueue(queueName.toString()));
cn1.start();
MessageProducer pd = sn0.createProducer(sn0.createQueue(queueName.toString()));
// Wait for cluster nodes to sync consumer count
waitForBindings(0, "queues.0", 1, 1, false);
waitForBindings(1, "queues.0", 1, 1, false);
// Start queue redistributor
c0.close();
// Send Messages to node1.
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
pd.send(sn0.createTextMessage("hello " + i));
}
// Ensure the messages are redistributed from node0 to node1.
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
assertNotNull(c1.receive(1000));
}
// Close client on node1. This should make the node0 stop redistributing.
c1.close();
sn1.close();
cn1.close();
// Send more messages (these should stay in node0)
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
pd.send(sn0.createTextMessage("hello " + i));
}
// Messages should stay in node 1 and note get redistributed.
assertEquals(NUMBER_OF_MESSAGES, servers[0].locateQueue(queueName).getMessageCount());
assertEquals(0, servers[1].locateQueue(queueName).getMessageCount());
}
@Test
public void testExpireRedistributed() throws Exception {
startServers(MessageLoadBalancingType.ON_DEMAND);
ConnectionFactory factory = getJmsConnectionFactory(1);
waitForBindings(0, "queues.0", 1, 0, true);
waitForBindings(1, "queues.0", 1, 0, true);
@ -193,10 +256,9 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
waitForBindings(0, "queues.0", 1, 0, false);
waitForBindings(1, "queues.0", 1, 0, false);
// sending Messages..
{
ConnectionFactory cf = getJmsConnectionFactory(0);
ConnectionFactory cf = getJmsConnectionFactory(0);
Connection cn = cf.createConnection();
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
@ -212,7 +274,6 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
// time to let stuff expire
Thread.sleep(200);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue("queues.expiry"));
@ -253,6 +314,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
servers[0].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
servers[1].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
}
protected void stopServers() throws Exception {
@ -280,5 +343,4 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
return configuration;
}
}