From e5bce13316f7e81bb15a12592622df2ea2632a35 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 6 Apr 2018 10:00:43 -0400 Subject: [PATCH] ARTEMIS-1776 Blocked Bridge is not resuming after reconnect This is still part of ARTEMIS-1776 fix, which still part of the same release as we are on now. Hence I'm not opening a new JIRA for this one. --- .../core/server/cluster/impl/BridgeImpl.java | 9 ++ .../cluster/bridge/BridgeTest.java | 148 ++++++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index e40bc467f9..48f59f4907 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -220,6 +220,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled this.server = server; } + /** For tests mainly */ + public boolean isBlockedOnFlowControl() { + return blockedOnFlowControl; + } + public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) { byte[] bytes = new byte[24]; @@ -924,6 +929,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + // need to reset blockedOnFlowControl after creating a new producer + // otherwise in case the bridge was blocked before a previous failure + // this would never resume + blockedOnFlowControl = false; producer = session.createProducer(); session.addFailureListener(BridgeImpl.this); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index ae60a6150a..2d6add7f72 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -75,10 +75,13 @@ import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; @@ -251,6 +254,151 @@ public class BridgeTest extends ActiveMQTestBase { System.out.println(timeTaken + "ms"); } + @Test + public void testBlockedBridgeAndReconnect() throws Exception { + long time = System.currentTimeMillis(); + Map server0Params = new HashMap<>(); + server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params); + + Map server1Params = new HashMap<>(); + addTargetParameters(server1Params); + server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + server1.getAddressSettingsRepository().clear(); + server1.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(10124 * 10).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK)); + + server0.getAddressSettingsRepository().clear(); + server0.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(Long.MAX_VALUE).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK)); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + + TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params); + + TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + + HashMap connectors = new HashMap<>(); + connectors.put(server1tc.getName(), server1tc); + server0.getConfiguration().setConnectorConfigurations(connectors); + + final int messageSize = 1024; + + final int numMessages = 1000; + + ArrayList connectorConfig = new ArrayList<>(); + connectorConfig.add(server1tc.getName()); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(true).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig).setProducerWindowSize(1024); + + List bridgeConfigs = new ArrayList<>(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0); + List queueConfigs0 = new ArrayList<>(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigurations(queueConfigs0); + + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1); + List queueConfigs1 = new ArrayList<>(); + queueConfigs1.add(queueConfig1); + server1.getConfiguration().setQueueConfigurations(queueConfigs1); + + server1.start(); + server0.start(); + locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); + ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc)); + + ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc)); + + ClientSession session0 = sf0.createSession(false, true, 0); + ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress)); + + ClientSession session1 = sf1.createSession(true, true, 0); + ClientConsumer consumer1 = session1.createConsumer(queueName1); + + + session1.start(); + + final byte[] bytes = new byte[messageSize]; + + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session0.createMessage(true); + + message.putIntProperty(propKey, i); + + message.getBodyBuffer().writeBytes(bytes); + + producer0.send(message); + + if (i % 100 == 0) { + session0.commit(); + } + } + session0.commit(); + + for (int i = 0; i < numMessages / 2; i++) { + ClientMessage message = consumer1.receive(5000); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey)); + + message.acknowledge(); + } + session1.commit(); + + BridgeImpl bridge = (BridgeImpl)server0.getClusterManager().getBridges().get("bridge1"); + + // stop in the middle. wait the bridge to block + Wait.assertTrue("bridge is never blocked", bridge::isBlockedOnFlowControl); + + session1.close(); + sf1.close(); + + // now restart the server.. the bridge should be reconnecting now + server1.stop(); + server1.start(); + + sf1 = addSessionFactory(locator.createSessionFactory(server1tc)); + session1 = sf1.createSession(true, true, 0); + consumer1 = session1.createConsumer(queueName1); + session1.start(); + + // consume the rest of the messages + for (int i = numMessages / 2; i < numMessages; i++) { + ClientMessage message = consumer1.receive(5000); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey)); + + message.acknowledge(); + } + + + Wait.assertEquals(0, server0.locateQueue(SimpleString.toSimpleString("queue0"))::getMessageCount); + + Assert.assertNull(consumer1.receiveImmediate()); + + session0.close(); + + session1.close(); + + sf0.close(); + + sf1.close(); + + closeFields(); + if (server0.getConfiguration().isPersistenceEnabled()) { + assertEquals(0, loadQueues(server0).size()); + } + long timeTaken = System.currentTimeMillis() - time; + System.out.println(timeTaken + "ms"); + } + public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception { Map server0Params = new HashMap<>(); server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);