diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 4458ac968c..6c928d4f2f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -103,7 +103,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private final SimpleString forwardingAddress; - private final java.util.Map refs = new LinkedHashMap<>(); + final java.util.Map refs = new LinkedHashMap<>(); private final Transformer transformer; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index c0ed1bd911..5f0d5d2eb5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3307,10 +3307,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final boolean ignoreRedeliveryDelay) throws Exception { if (internalQueue) { + if (logger.isTraceEnabled()) { logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery"); } // no DLQ check on internal queues + // we just need to return statistics on the delivering + decDelivering(reference); return new Pair<>(true, false); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeTestAccessor.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeTestAccessor.java new file mode 100644 index 0000000000..d597927ed5 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeTestAccessor.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.cluster.impl; + +import java.util.Map; +import java.util.function.Function; + +import org.apache.activemq.artemis.core.server.MessageReference; + +/** it will provide accessors for Bridge during testing. + * Do not use this outside of the context of UnitTesting. */ +public class BridgeTestAccessor { + + public static Map getRefs(BridgeImpl bridge) { + return bridge.refs; + } + + public static boolean withinRefs(BridgeImpl bridge, Function, Boolean> function) { + Map refs = getRefs(bridge); + return function.apply(refs); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java index aef4534cce..78f23522b4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java @@ -18,6 +18,10 @@ package org.apache.activemq.artemis.tests.integration.cluster.bridge; import java.util.ArrayList; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; @@ -27,6 +31,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; @@ -79,6 +84,7 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase { ClientProducer producer = session0.createProducer("queues.testaddress"); int NUMBER_OF_MESSAGES = 100; + int REPEATS = 5; Assert.assertEquals(1, servers[0].getClusterManager().getClusterConnections().size()); @@ -91,16 +97,55 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase { Wait.assertEquals(2, () -> bridge.getSessionFactory().getServerLocator().getTopology().getMembers().size()); ArrayList originalmembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers()); - for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { - ClientMessage msg = session0.createMessage(true); - producer.send(msg); - session0.commit(); + AtomicInteger errors = new AtomicInteger(0); - if (i == 17) { - bridge.getSessionFactory().getConnection().fail(new ActiveMQException("failed once!")); - } + // running the loop a couple of times + for (int repeat = 0; repeat < REPEATS; repeat++) { + CountDownLatch latchSent = new CountDownLatch(1); + Thread t = new Thread(() -> { + try { + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + ClientMessage msg = session0.createMessage(true); + producer.send(msg); + latchSent.countDown(); + if (i % 10 == 0) { + session0.commit(); + } + } + session0.commit(); + } catch (Exception e) { + errors.incrementAndGet(); + // not really supposed to happen + e.printStackTrace(); + } + }); + t.start(); + + Executor executorFail = servers[0].getExecutorFactory().getExecutor(); + + Assert.assertTrue(latchSent.await(10, TimeUnit.SECONDS)); + + Wait.waitFor(() -> BridgeTestAccessor.withinRefs(bridge, (refs) -> { + synchronized (refs) { + if (refs.size() > 0) { + executorFail.execute(() -> { + bridge.connectionFailed(new ActiveMQException("bye"), false); + }); + return true; + } else { + return false; + } + } + }), 500, 1); + + Wait.assertEquals(0L, bridge.getQueue()::getMessageCount, 5000, 1); + Wait.assertEquals(0, bridge.getQueue()::getDeliveringCount, 5000, 1); + + t.join(5000); } + + Assert.assertEquals(0, errors.get()); Wait.assertEquals(2, () -> bridge.getSessionFactory().getServerLocator().getTopology().getMembers().size()); ArrayList afterReconnectedMembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers()); @@ -126,7 +171,7 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase { int cons0Count = 0, cons1Count = 0; while (true) { - ClientMessage msg = consumers[0].getConsumer().receive(1000); + ClientMessage msg = consumers[0].getConsumer().receiveImmediate(); if (msg == null) { break; } @@ -136,7 +181,7 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase { } while (true) { - ClientMessage msg = consumers[1].getConsumer().receive(1000); + ClientMessage msg = consumers[1].getConsumer().receiveImmediate(); if (msg == null) { break; } @@ -145,7 +190,7 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase { session1.commit(); } - Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES, cons0Count + cons1Count); + Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES * REPEATS, cons0Count + cons1Count); session0.commit(); session1.commit(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/NettyBridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/NettyBridgeReconnectTest.java new file mode 100644 index 0000000000..eed9483763 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/NettyBridgeReconnectTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.bridge; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Consumer; +import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.jboss.logging.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class NettyBridgeReconnectTest extends BridgeTestBase { + + private static final Logger log = Logger.getLogger(NettyBridgeReconnectTest.class); + + final String bridgeName = "bridge1"; + final String testAddress = "testAddress"; + final int confirmationWindowSize = 100 * 1024; + + Map server0Params; + Map server1Params; + + ActiveMQServer server0; + ActiveMQServer server1; + + private TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc"); + private TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc"); + + private Map connectors; + private ArrayList staticConnectors; + + @After + public void destroyServer() throws Exception { + if (server1 != null) { + server1.stop(); + } + + if (server0 != null) { + server0.stop(); + } + } + + private void server1Start() throws Exception { + server1.start(); + } + + public void server0Start() throws Exception { + server0 = createActiveMQServer(0, server0Params, isNetty(), null); + server0.getConfiguration().setConnectorConfigurations(connectors); + server0.start(); + } + + @Before + public void setServer() throws Exception { + server0Params = new HashMap<>(); + server1Params = new HashMap<>(); + connectors = new HashMap<>(); + + server1 = createActiveMQServer(1, isNetty(), server1Params); + connectors.put(server1tc.getName(), server1tc); + + connectors.put(server0tc.getName(), server0tc); + connectors.put(server1tc.getName(), server1tc); + + staticConnectors = new ArrayList<>(); + staticConnectors.add(server1tc.getName()); + } + + protected boolean isNetty() { + return true; + } + + /** + * @return + */ + private String getConnector() { + return NETTY_CONNECTOR_FACTORY; + } + + @Test + public void testFailoverWhileSending() throws Exception { + internalFailoverWhileSending(false); + } + + @Test + public void testFailoverWhileSendingInternalQueue() throws Exception { + internalFailoverWhileSending(true); + } + + private void internalFailoverWhileSending(boolean forceInternal) throws Exception { + + connectors.put(server0tc.getName(), server0tc); + connectors.put(server1tc.getName(), server1tc); + + server1.getConfiguration().setConnectorConfigurations(connectors); + + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName(bridgeName).setQueueName(testAddress).setForwardingAddress(testAddress).setRetryInterval(10).setReconnectAttempts(-1).setReconnectAttemptsOnSameNode(-1).setConfirmationWindowSize(confirmationWindowSize).setPassword(CLUSTER_PASSWORD); + List bridgeConnectors = new ArrayList<>(); + bridgeConnectors.add(server0tc.getName()); + bridgeConfiguration.setStaticConnectors(bridgeConnectors); + + bridgeConfiguration.setQueueName(testAddress); + List bridgeConfigs = new ArrayList<>(); + bridgeConfigs.add(bridgeConfiguration); + server1.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + server0Start(); + server1Start(); + org.apache.activemq.artemis.core.server.Queue serverQueue1 = server1.locateQueue(testAddress); + if (forceInternal) { + // pretending this is an internal queue + // as the check acks will play it differently + serverQueue1.setInternalQueue(true); + } + + int TRANSACTIONS = 10; + int NUM_MESSAGES = 1000; + + try (ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory) CFUtil.createConnectionFactory("core", "tcp://localhost:61617"); + Connection connection = cf.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) { + Queue queue = session.createQueue(testAddress); + MessageProducer producer = session.createProducer(queue); + int sequenceID = 0; + for (int j = 0; j < TRANSACTIONS; j++) { + for (int i = 0; i < NUM_MESSAGES; i++) { + producer.send(session.createTextMessage("" + (sequenceID++))); + } + session.commit(); + } + } + + //remoteServer0 = SpawnedVMSupport.spawnVM(NettyBridgeReconnectTest.class.getName(), getTestDir()); + Wait.waitFor(() -> serverQueue1.getDeliveringCount() > 10, 300_000); + + BridgeImpl bridge = null; + for (Consumer c : serverQueue1.getConsumers()) { + System.out.println("Consumer " + c); + if (c instanceof BridgeImpl) { + bridge = (BridgeImpl) c; + } + } + Assert.assertNotNull(bridge); + + Executor executorFail = server1.getExecutorFactory().getExecutor(); + + { + BridgeImpl finalBridge = bridge; + Wait.assertTrue(() -> BridgeTestAccessor.withinRefs(finalBridge, (refs) -> { + synchronized (refs) { + if (refs.size() > 100) { + executorFail.execute(() -> { + finalBridge.connectionFailed(new ActiveMQException("bye"), false); + }); + return true; + } else { + return false; + } + } + })); + } + + AtomicInteger queuesTested = new AtomicInteger(0); + // Everything is transferred, so everything should be zeroed on the addressses + server1.getPostOffice().getAllBindings().filter(b -> b instanceof LocalQueueBinding).forEach((b) -> { + queuesTested.incrementAndGet(); + try { + Wait.assertEquals(0, ((LocalQueueBinding) b).getQueue().getPagingStore()::getAddressSize); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + }); + Assert.assertTrue(queuesTested.get() > 0); + + Wait.assertEquals(0, serverQueue1::getDeliveringCount); + Wait.assertEquals(0, serverQueue1::getMessageCount); + + try (ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory) CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Queue queue = session.createQueue(testAddress); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + HashSet received = new HashSet<>(); + for (int j = 0; j < TRANSACTIONS * NUM_MESSAGES; j++) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + received.add(message.getText()); + //Assert.assertEquals("" + j, message.getText()); + } + Assert.assertEquals(TRANSACTIONS * NUM_MESSAGES, received.size()); + Assert.assertNull(consumer.receiveNoWait()); + } + + + queuesTested.set(0); + // After everything is consumed.. we will check the sizes being 0 + server0.getPostOffice().getAllBindings().filter(b -> b instanceof LocalQueueBinding).forEach((b) -> { + queuesTested.incrementAndGet(); + try { + Wait.assertEquals(0, ((LocalQueueBinding) b).getQueue().getPagingStore()::getAddressSize); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + }); + Assert.assertTrue(queuesTested.get() > 0); + + + } + + @Override + protected ActiveMQServer createActiveMQServer(final int id, + final Map params, + final boolean netty, + final NodeManager nodeManager) throws Exception { + ActiveMQServer server = super.createActiveMQServer(id, params, netty, nodeManager); + + QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress).setAddress(testAddress).setRoutingType(RoutingType.ANYCAST); + List queueConfigs0 = new ArrayList<>(); + queueConfigs0.add(queueConfig0); + + CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration(); + addressConfiguration.setName(testAddress).addRoutingType(RoutingType.ANYCAST); + addressConfiguration.addQueueConfiguration(new QueueConfiguration(testAddress).setAddress(testAddress).setRoutingType(RoutingType.ANYCAST)); + server.getConfiguration().addAddressConfiguration(addressConfiguration); + + server.getConfiguration().setPersistIDCache(true); + return server; + } + +}