diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 15a1b54a17..0b70c31caa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -18,15 +18,12 @@ package org.apache.activemq.artemis.core.server.cluster.impl; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; @@ -44,14 +41,8 @@ public class Redistributor implements Consumer { private final PostOffice postOffice; - private final Executor executor; - - private final int batchSize; - private final Queue queue; - private int count; - private final long sequentialID; // a Flush executor here is happening inside another executor. @@ -61,9 +52,7 @@ public class Redistributor implements Consumer { public Redistributor(final Queue queue, final StorageManager storageManager, - final PostOffice postOffice, - final Executor executor, - final int batchSize) { + final PostOffice postOffice) { this.queue = queue; this.sequentialID = storageManager.generateID(); @@ -71,10 +60,6 @@ public class Redistributor implements Consumer { this.storageManager = storageManager; this.postOffice = postOffice; - - this.executor = executor; - - this.batchSize = batchSize; } @Override @@ -103,39 +88,17 @@ public class Redistributor implements Consumer { } public synchronized void start() { - active = true; + this.active = true; } public synchronized void stop() throws Exception { - active = false; - - boolean ok = flushExecutor(); - - if (!ok) { - ActiveMQServerLogger.LOGGER.errorStoppingRedistributor(); - } + this.active = false; } public synchronized void close() { - boolean ok = flushExecutor(); - - if (!ok) { - throw new IllegalStateException("Timed out waiting for executor to complete"); - } - active = false; } - private boolean flushExecutor() { - try { - boolean ok = pendingRuns.await(10000); - return ok; - } catch (InterruptedException e) { - ActiveMQServerLogger.LOGGER.failedToFlushExecutor(e); - return false; - } - } - @Override public synchronized HandleStatus handle(final MessageReference reference) throws Exception { if (!active) { @@ -154,41 +117,9 @@ public class Redistributor implements Consumer { return HandleStatus.BUSY; } - if (!reference.getMessage().isLargeMessage()) { + postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false); - postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false); - - ackRedistribution(reference, tx); - } else { - active = false; - executor.execute(new Runnable() { - @Override - public void run() { - try { - - postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false); - - ackRedistribution(reference, tx); - - synchronized (Redistributor.this) { - active = true; - - count++; - - queue.deliverAsync(); - } - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorRedistributing(e, toManagementString(), reference.getMessageID()); - try { - tx.rollback(); - } catch (Exception e2) { - // Nothing much we can do now - ActiveMQServerLogger.LOGGER.failedToRollback(e2); - } - } - } - }); - } + ackRedistribution(reference, tx); return HandleStatus.HANDLED; } @@ -198,68 +129,12 @@ public class Redistributor implements Consumer { // no op } - private void internalExecute(final Runnable runnable) { - pendingRuns.countUp(); - executor.execute(new Runnable() { - @Override - public void run() { - try { - runnable.run(); - } finally { - pendingRuns.countDown(); - } - } - }); - } - private void ackRedistribution(final MessageReference reference, final Transaction tx) throws Exception { reference.handled(); queue.acknowledge(tx, reference); tx.commit(); - - storageManager.afterCompleteOperations(new IOCallback() { - - @Override - public void onError(final int errorCode, final String errorMessage) { - ActiveMQServerLogger.LOGGER.ioErrorRedistributing(errorCode, errorMessage); - } - - @Override - public void done() { - execPrompter(); - } - }); - } - - private void execPrompter() { - count++; - - // We use >= as the large message redistribution will set count to max_int - // so we are use the prompter will get called - if (count >= batchSize) { - // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very - // long time in the case there are many messages in the queue - active = false; - - executor.execute(new Prompter()); - - count = 0; - } - - } - - private class Prompter implements Runnable { - - @Override - public void run() { - synchronized (Redistributor.this) { - active = true; - - queue.deliverAsync(); - } - } } /* (non-Javadoc) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 9e63cbd9cc..bb21bb2b76 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -141,8 +141,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private static final AtomicLongFieldUpdater consumerRemovedTimestampUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "consumerRemovedTimestamp"); private static final AtomicReferenceFieldUpdater filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter"); - public static final int REDISTRIBUTOR_BATCH_SIZE = 100; - public static final int NUM_PRIORITIES = 10; public static final int MAX_DELIVERIES_IN_LOOP = 1000; @@ -1532,7 +1530,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS); } } else { - internalAddRedistributor(executor); + internalAddRedistributor(); } } @@ -3257,12 +3255,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return messageReferences.removeWithID(serverID, id); } - private void internalAddRedistributor(final ArtemisExecutor executor) { + private void internalAddRedistributor() { if (redistributor == null && (consumers.isEmpty() || hasUnMatchedPending)) { if (logger.isTraceEnabled()) { logger.trace("QueueImpl::Adding redistributor on queue " + this.toString()); } - redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)); + redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice)); redistributor.consumer.start(); consumers.add(redistributor); hasUnMatchedPending = false; @@ -4130,7 +4128,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void run() { synchronized (QueueImpl.this) { - internalAddRedistributor(executor1); + internalAddRedistributor(); clearRedistributorFuture(); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 5237ee13be..7c02b525c1 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -147,6 +147,7 @@ import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.FileUtil; import org.apache.activemq.artemis.utils.PortCheckRule; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.RunnableEx; import org.apache.activemq.artemis.utils.ThreadDumpUtil; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -228,6 +229,31 @@ public abstract class ActiveMQTestBase extends Assert { // There is a verification about thread leakages. We only fail a single thread when this happens private static Set alreadyFailedThread = new HashSet<>(); + private LinkedList runAfter; + + protected synchronized void runAfter(RunnableEx run) { + Assert.assertNotNull(run); + if (runAfter == null) { + runAfter = new LinkedList(); + } + runAfter.add(run); + } + + @After + public void runAfter() { + if (runAfter != null) { + runAfter.forEach((r) -> { + try { + r.run(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + }); + } + } + + + private final Collection servers = new ArrayList<>(); private final Collection locators = new ArrayList<>(); private final Collection sessionFactories = new ArrayList<>(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java index 5f4d2d8d73..427564de18 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java @@ -49,21 +49,15 @@ import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; -import org.apache.activemq.artemis.utils.RetryRule; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @RunWith(value = Parameterized.class) public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { - - @Rule - public RetryRule retryRule = new RetryRule(2); - private static final int NUMBER_OF_SERVERS = 2; private static final SimpleString queueName = SimpleString.toSimpleString("queues.0"); @@ -166,6 +160,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { { ConnectionFactory cf = getJmsConnectionFactory(0); Connection cn = cf.createConnection(); + runAfter(cn::close); Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString())); @@ -217,11 +212,13 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { // Create Consumers Connection cn0 = cf0.createConnection(); + runAfter(cn0::close); Session sn0 = cn0.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer c0 = sn0.createConsumer(sn0.createQueue(queueName.toString())); cn0.start(); Connection cn1 = cf1.createConnection(); + runAfter(cn1::close); Session sn1 = cn1.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer c1 = sn1.createConsumer(sn0.createQueue(queueName.toString())); cn1.start(); @@ -258,8 +255,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { cn0.close(); // Messages should stay in node 1 and note get redistributed. - assertEquals(NUMBER_OF_MESSAGES, servers[0].locateQueue(queueName).getMessageCount()); - assertEquals(0, servers[1].locateQueue(queueName).getMessageCount()); + Wait.assertEquals(NUMBER_OF_MESSAGES, servers[0].locateQueue(queueName)::getMessageCount); + Wait.assertEquals(0, servers[1].locateQueue(queueName)::getMessageCount); } @Test @@ -293,6 +290,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { Thread.sleep(200); Connection connection = factory.createConnection(); + runAfter(connection::close); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue("queues.expiry")); @@ -322,6 +320,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { for (int node = 0; node < NUMBER_OF_SERVERS; node++) { factory[node] = getJmsConnectionFactory(node); connection[node] = factory[node].createConnection(); + runAfter(connection[node]::close); session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE); consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString())); } @@ -400,6 +399,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { for (int node = 0; node < NUMBER_OF_SERVERS; node++) { factory[node] = getJmsConnectionFactory(node); connection[node] = factory[node].createConnection(); + runAfter(connection[node]::close); session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE); consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString())); } @@ -413,8 +413,6 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { if (protocol.equals("AMQP")) { - - ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616"); locator.setMinLargeMessageSize(1024); ClientSessionFactory coreFactory = locator.createSessionFactory(); @@ -446,13 +444,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { StringBuffer stringbuffer = new StringBuffer(); stringbuffer.append("hello"); - if (i % 3 == 0) { - // making 1/3 of the messages to be large message - for (int j = 0; j < 300 * 1024; j++) { - stringbuffer.append(" "); - } - } - pd.send(sn.createTextMessage(stringbuffer.toString())); + Message message = sn.createTextMessage(stringbuffer.toString()); + pd.send(message); } cn.close(); @@ -476,7 +469,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { for (int i = 0; i < messageCount; i++) { Message msg = messageConsumer.receive(5000); - Assert.assertNotNull(msg); + Assert.assertNotNull("did not receive message at " + i, msg); } // this means no more messages received diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 360f1a0830..2c7b92a206 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -1177,16 +1177,16 @@ public class MessageRedistributionTest extends ClusterTestBase { waitForBindings(1, "queues.testaddress", 2, 1, false); waitForBindings(2, "queues.testaddress", 2, 1, false); - send(0, "queues.testaddress", QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, false, null); + send(0, "queues.testaddress", 200, false, null); removeConsumer(0); addConsumer(1, 1, "queue0", null); Queue queue = servers[1].locateQueue(SimpleString.toSimpleString("queue0")); Assert.assertNotNull(queue); - Wait.waitFor(() -> queue.getMessageCount() == QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2); + Wait.waitFor(() -> queue.getMessageCount() == 200); - for (int i = 0; i < QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2; i++) { + for (int i = 0; i < 200; i++) { ClientMessage message = consumers[1].getConsumer().receive(5000); Assert.assertNotNull(message); message.acknowledge();