This commit is contained in:
Clebert Suconic 2018-06-04 09:47:05 -04:00
commit 2baf377568
5 changed files with 108 additions and 22 deletions

View File

@ -18,9 +18,11 @@ package org.apache.activemq.artemis.core.postoffice;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
public class QueueInfo implements Serializable { public class QueueInfo implements Serializable {
@ -38,7 +40,9 @@ public class QueueInfo implements Serializable {
private List<SimpleString> filterStrings; private List<SimpleString> filterStrings;
private int numberOfConsumers; private volatile int consumersCount = 0;
private static final AtomicIntegerFieldUpdater<QueueInfo> consumerUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueInfo.class, "consumersCount");
private final int distance; private final int distance;
@ -99,15 +103,23 @@ public class QueueInfo implements Serializable {
} }
public int getNumberOfConsumers() { public int getNumberOfConsumers() {
return numberOfConsumers; return consumerUpdater.get(this);
} }
public void incrementConsumers() { public void incrementConsumers() {
numberOfConsumers++; consumerUpdater.incrementAndGet(this);
} }
public void decrementConsumers() { public void decrementConsumers() {
numberOfConsumers--;
consumerUpdater.getAndUpdate(this, value -> {
if (value > 0) {
return value--;
} else {
ActiveMQServerLogger.LOGGER.consumerCountError("Tried to decrement consumer count below 0: " + this);
return value;
}
});
} }
public boolean matchesAddress(SimpleString address) { public boolean matchesAddress(SimpleString address) {
@ -144,7 +156,7 @@ public class QueueInfo implements Serializable {
", filterStrings=" + ", filterStrings=" +
filterStrings + filterStrings +
", numberOfConsumers=" + ", numberOfConsumers=" +
numberOfConsumers + consumersCount +
", distance=" + ", distance=" +
distance + distance +
"]"; "]";

View File

@ -376,7 +376,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
filterStrings.remove(filterString); filterStrings.remove(filterString);
} }
if (info.getNumberOfConsumers() == 0) { // The consumer count should never be < 0 but we should catch here just in case.
if (info.getNumberOfConsumers() <= 0) {
if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) { if (!props.containsProperty(ManagementHelper.HDR_DISTANCE)) {
logger.debug("PostOffice notification / CONSUMER_CLOSED: HDR_DISTANCE not defined"); logger.debug("PostOffice notification / CONSUMER_CLOSED: HDR_DISTANCE not defined");
return; return;

View File

@ -1946,4 +1946,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.TRACE) @LogMessage(level = Logger.Level.TRACE)
@Message(id = 224094, value = "Quorum vote result await is interrupted", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224094, value = "Quorum vote result await is interrupted", format = Message.Format.MESSAGE_FORMAT)
void quorumVoteAwaitInterrupted(); void quorumVoteAwaitInterrupted();
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT)
void consumerCountError(String reason);
} }

View File

@ -157,6 +157,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private boolean anycast = false; private boolean anycast = false;
private boolean isClosed = false;
// Constructors --------------------------------------------------------------------------------- // Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id, public ServerConsumerImpl(final long id,
@ -477,7 +479,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
@Override @Override
public void close(final boolean failed) throws Exception { public synchronized void close(final boolean failed) throws Exception {
// Close should only ever be done once per consumer.
if (isClosed) return;
isClosed = true;
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
} }

View File

@ -29,6 +29,7 @@ import java.util.Collection;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
@ -51,15 +52,13 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
private static final int NUMBER_OF_SERVERS = 2; private static final int NUMBER_OF_SERVERS = 2;
private static final SimpleString queueName = SimpleString.toSimpleString("queues.0"); private static final SimpleString queueName = SimpleString.toSimpleString("queues.0");
// I'm taking any number that /2 = Odd // I'm taking any number that /2 = Odd
// to avoid perfect roundings and making sure messages are evenly distributed // to avoid perfect roundings and making sure messages are evenly distributed
private static final int NUMBER_OF_MESSAGES = 77 * 2; private static final int NUMBER_OF_MESSAGES = 77 * 2;
@Parameterized.Parameters(name = "protocol={0}") @Parameterized.Parameters(name = "protocol={0}")
public static Collection getParameters() { public static Collection getParameters() {
return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}}); return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}, {"OPENWIRE"}});
} }
@Parameterized.Parameter(0) @Parameterized.Parameter(0)
@ -103,14 +102,19 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
private ConnectionFactory getJmsConnectionFactory(int node) { private ConnectionFactory getJmsConnectionFactory(int node) {
if (protocol.equals("AMQP")) { if (protocol.equals("AMQP")) {
return new JmsConnectionFactory("amqp://localhost:" + (61616 + node)); return new JmsConnectionFactory("amqp://localhost:" + (61616 + node));
} else { } else if (protocol.equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
} else if (protocol.equals("CORE")) {
return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node)); return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
} else {
Assert.fail("Protocol " + protocol + " unkown");
return null;
} }
} }
private void pauseClusteringBridges(ActiveMQServer server) throws Exception { private void pauseClusteringBridges(ActiveMQServer server) throws Exception {
for (ClusterConnection clusterConnection : server.getClusterManager().getClusterConnections()) { for (ClusterConnection clusterConnection : server.getClusterManager().getClusterConnections()) {
for (MessageFlowRecord record : ((ClusterConnectionImpl)clusterConnection).getRecords().values()) { for (MessageFlowRecord record : ((ClusterConnectionImpl) clusterConnection).getRecords().values()) {
record.getBridge().pause(); record.getBridge().pause();
} }
} }
@ -123,8 +127,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS]; ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS];
Connection[] connection = new Connection[NUMBER_OF_SERVERS]; Connection[] connection = new Connection[NUMBER_OF_SERVERS];
Session[] session = new Session[NUMBER_OF_SERVERS]; Session[] session = new Session[NUMBER_OF_SERVERS];
MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS]; MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS];
// this will pre create consumers to make sure messages are distributed evenly without redistribution // this will pre create consumers to make sure messages are distributed evenly without redistribution
for (int node = 0; node < NUMBER_OF_SERVERS; node++) { for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
@ -142,10 +146,9 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
pauseClusteringBridges(servers[0]); pauseClusteringBridges(servers[0]);
// sending Messages.. they should be load balanced // sending Messages.. they should be load balanced
{ {
ConnectionFactory cf = getJmsConnectionFactory(0); ConnectionFactory cf = getJmsConnectionFactory(0);
Connection cn = cf.createConnection(); Connection cn = cf.createConnection();
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString())); MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
@ -180,12 +183,72 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
} }
@Test @Test
public void testExpireRedistributed() throws Exception { public void testRedistributionStoppedWithNoRemoteConsumers() throws Exception {
startServers(MessageLoadBalancingType.ON_DEMAND); startServers(MessageLoadBalancingType.ON_DEMAND);
ConnectionFactory factory = getJmsConnectionFactory(1); ConnectionFactory factory = getJmsConnectionFactory(1);
// Wait for cluster nodes to sync
waitForBindings(0, "queues.0", 1, 0, true);
waitForBindings(1, "queues.0", 1, 0, true);
waitForBindings(0, "queues.0", 1, 0, false);
waitForBindings(1, "queues.0", 1, 0, false);
// Create CFs
ConnectionFactory cf0 = getJmsConnectionFactory(0);
ConnectionFactory cf1 = getJmsConnectionFactory(1);
// Create Consumers
Connection cn0 = cf0.createConnection();
Session sn0 = cn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c0 = sn0.createConsumer(sn0.createQueue(queueName.toString()));
cn0.start();
Connection cn1 = cf1.createConnection();
Session sn1 = cn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c1 = sn1.createConsumer(sn0.createQueue(queueName.toString()));
cn1.start();
MessageProducer pd = sn0.createProducer(sn0.createQueue(queueName.toString()));
// Wait for cluster nodes to sync consumer count
waitForBindings(0, "queues.0", 1, 1, false);
waitForBindings(1, "queues.0", 1, 1, false);
// Start queue redistributor
c0.close();
// Send Messages to node1.
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
pd.send(sn0.createTextMessage("hello " + i));
}
// Ensure the messages are redistributed from node0 to node1.
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
assertNotNull(c1.receive(1000));
}
// Close client on node1. This should make the node0 stop redistributing.
c1.close();
sn1.close();
cn1.close();
// Send more messages (these should stay in node0)
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
pd.send(sn0.createTextMessage("hello " + i));
}
// Messages should stay in node 1 and note get redistributed.
assertEquals(NUMBER_OF_MESSAGES, servers[0].locateQueue(queueName).getMessageCount());
assertEquals(0, servers[1].locateQueue(queueName).getMessageCount());
}
@Test
public void testExpireRedistributed() throws Exception {
startServers(MessageLoadBalancingType.ON_DEMAND);
ConnectionFactory factory = getJmsConnectionFactory(1);
waitForBindings(0, "queues.0", 1, 0, true); waitForBindings(0, "queues.0", 1, 0, true);
waitForBindings(1, "queues.0", 1, 0, true); waitForBindings(1, "queues.0", 1, 0, true);
@ -193,10 +256,9 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
waitForBindings(0, "queues.0", 1, 0, false); waitForBindings(0, "queues.0", 1, 0, false);
waitForBindings(1, "queues.0", 1, 0, false); waitForBindings(1, "queues.0", 1, 0, false);
// sending Messages.. // sending Messages..
{ {
ConnectionFactory cf = getJmsConnectionFactory(0); ConnectionFactory cf = getJmsConnectionFactory(0);
Connection cn = cf.createConnection(); Connection cn = cf.createConnection();
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString())); MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
@ -212,7 +274,6 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
// time to let stuff expire // time to let stuff expire
Thread.sleep(200); Thread.sleep(200);
Connection connection = factory.createConnection(); Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue("queues.expiry")); MessageConsumer consumer = session.createConsumer(session.createQueue("queues.expiry"));
@ -253,6 +314,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory()); servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory()); servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
servers[0].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
servers[1].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
} }
protected void stopServers() throws Exception { protected void stopServers() throws Exception {
@ -280,5 +343,4 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
return configuration; return configuration;
} }
} }