ARTEMIS-1776 Blocked Bridge is not resuming after reconnect
This is still part of ARTEMIS-1776 fix, which still part of the same release as we are on now. Hence I'm not opening a new JIRA for this one.
This commit is contained in:
parent
57b9d979f5
commit
e5bce13316
|
@ -220,6 +220,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||||
this.server = server;
|
this.server = server;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** For tests mainly */
|
||||||
|
public boolean isBlockedOnFlowControl() {
|
||||||
|
return blockedOnFlowControl;
|
||||||
|
}
|
||||||
|
|
||||||
public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) {
|
public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) {
|
||||||
byte[] bytes = new byte[24];
|
byte[] bytes = new byte[24];
|
||||||
|
|
||||||
|
@ -924,6 +929,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// need to reset blockedOnFlowControl after creating a new producer
|
||||||
|
// otherwise in case the bridge was blocked before a previous failure
|
||||||
|
// this would never resume
|
||||||
|
blockedOnFlowControl = false;
|
||||||
producer = session.createProducer();
|
producer = session.createProducer();
|
||||||
session.addFailureListener(BridgeImpl.this);
|
session.addFailureListener(BridgeImpl.this);
|
||||||
|
|
||||||
|
|
|
@ -75,10 +75,13 @@ import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
|
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||||
|
@ -251,6 +254,151 @@ public class BridgeTest extends ActiveMQTestBase {
|
||||||
System.out.println(timeTaken + "ms");
|
System.out.println(timeTaken + "ms");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockedBridgeAndReconnect() throws Exception {
|
||||||
|
long time = System.currentTimeMillis();
|
||||||
|
Map<String, Object> server0Params = new HashMap<>();
|
||||||
|
server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
|
||||||
|
|
||||||
|
Map<String, Object> server1Params = new HashMap<>();
|
||||||
|
addTargetParameters(server1Params);
|
||||||
|
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
|
||||||
|
server1.getAddressSettingsRepository().clear();
|
||||||
|
server1.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(10124 * 10).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
|
||||||
|
|
||||||
|
server0.getAddressSettingsRepository().clear();
|
||||||
|
server0.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(Long.MAX_VALUE).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
|
||||||
|
|
||||||
|
final String testAddress = "testAddress";
|
||||||
|
final String queueName0 = "queue0";
|
||||||
|
final String forwardAddress = "forwardAddress";
|
||||||
|
final String queueName1 = "queue1";
|
||||||
|
|
||||||
|
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
|
||||||
|
|
||||||
|
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
|
||||||
|
|
||||||
|
HashMap<String, TransportConfiguration> connectors = new HashMap<>();
|
||||||
|
connectors.put(server1tc.getName(), server1tc);
|
||||||
|
server0.getConfiguration().setConnectorConfigurations(connectors);
|
||||||
|
|
||||||
|
final int messageSize = 1024;
|
||||||
|
|
||||||
|
final int numMessages = 1000;
|
||||||
|
|
||||||
|
ArrayList<String> connectorConfig = new ArrayList<>();
|
||||||
|
connectorConfig.add(server1tc.getName());
|
||||||
|
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(true).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig).setProducerWindowSize(1024);
|
||||||
|
|
||||||
|
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
|
||||||
|
bridgeConfigs.add(bridgeConfiguration);
|
||||||
|
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
|
||||||
|
|
||||||
|
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
|
||||||
|
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
|
||||||
|
queueConfigs0.add(queueConfig0);
|
||||||
|
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
|
||||||
|
|
||||||
|
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
|
||||||
|
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
|
||||||
|
queueConfigs1.add(queueConfig1);
|
||||||
|
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
|
||||||
|
|
||||||
|
server1.start();
|
||||||
|
server0.start();
|
||||||
|
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
|
||||||
|
ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
|
||||||
|
|
||||||
|
ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
|
||||||
|
|
||||||
|
ClientSession session0 = sf0.createSession(false, true, 0);
|
||||||
|
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
|
||||||
|
|
||||||
|
ClientSession session1 = sf1.createSession(true, true, 0);
|
||||||
|
ClientConsumer consumer1 = session1.createConsumer(queueName1);
|
||||||
|
|
||||||
|
|
||||||
|
session1.start();
|
||||||
|
|
||||||
|
final byte[] bytes = new byte[messageSize];
|
||||||
|
|
||||||
|
final SimpleString propKey = new SimpleString("testkey");
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
ClientMessage message = session0.createMessage(true);
|
||||||
|
|
||||||
|
message.putIntProperty(propKey, i);
|
||||||
|
|
||||||
|
message.getBodyBuffer().writeBytes(bytes);
|
||||||
|
|
||||||
|
producer0.send(message);
|
||||||
|
|
||||||
|
if (i % 100 == 0) {
|
||||||
|
session0.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
session0.commit();
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessages / 2; i++) {
|
||||||
|
ClientMessage message = consumer1.receive(5000);
|
||||||
|
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
|
||||||
|
Assert.assertEquals(i, message.getObjectProperty(propKey));
|
||||||
|
|
||||||
|
message.acknowledge();
|
||||||
|
}
|
||||||
|
session1.commit();
|
||||||
|
|
||||||
|
BridgeImpl bridge = (BridgeImpl)server0.getClusterManager().getBridges().get("bridge1");
|
||||||
|
|
||||||
|
// stop in the middle. wait the bridge to block
|
||||||
|
Wait.assertTrue("bridge is never blocked", bridge::isBlockedOnFlowControl);
|
||||||
|
|
||||||
|
session1.close();
|
||||||
|
sf1.close();
|
||||||
|
|
||||||
|
// now restart the server.. the bridge should be reconnecting now
|
||||||
|
server1.stop();
|
||||||
|
server1.start();
|
||||||
|
|
||||||
|
sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
|
||||||
|
session1 = sf1.createSession(true, true, 0);
|
||||||
|
consumer1 = session1.createConsumer(queueName1);
|
||||||
|
session1.start();
|
||||||
|
|
||||||
|
// consume the rest of the messages
|
||||||
|
for (int i = numMessages / 2; i < numMessages; i++) {
|
||||||
|
ClientMessage message = consumer1.receive(5000);
|
||||||
|
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
|
||||||
|
Assert.assertEquals(i, message.getObjectProperty(propKey));
|
||||||
|
|
||||||
|
message.acknowledge();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Wait.assertEquals(0, server0.locateQueue(SimpleString.toSimpleString("queue0"))::getMessageCount);
|
||||||
|
|
||||||
|
Assert.assertNull(consumer1.receiveImmediate());
|
||||||
|
|
||||||
|
session0.close();
|
||||||
|
|
||||||
|
session1.close();
|
||||||
|
|
||||||
|
sf0.close();
|
||||||
|
|
||||||
|
sf1.close();
|
||||||
|
|
||||||
|
closeFields();
|
||||||
|
if (server0.getConfiguration().isPersistenceEnabled()) {
|
||||||
|
assertEquals(0, loadQueues(server0).size());
|
||||||
|
}
|
||||||
|
long timeTaken = System.currentTimeMillis() - time;
|
||||||
|
System.out.println(timeTaken + "ms");
|
||||||
|
}
|
||||||
|
|
||||||
public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception {
|
public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception {
|
||||||
Map<String, Object> server0Params = new HashMap<>();
|
Map<String, Object> server0Params = new HashMap<>();
|
||||||
server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
|
server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
|
||||||
|
|
Loading…
Reference in New Issue